|
- import time
-
- from unittest import TestCase
- from mock import Mock, MagicMock, patch
-
- from alarmdecoder.decoder import AlarmDecoder
- from alarmdecoder.devices import USBDevice
- from alarmdecoder.messages import Message, RFMessage, LRRMessage, ExpanderMessage
- from alarmdecoder.event.event import Event, EventHandler
- from alarmdecoder.zonetracking import Zonetracker
-
-
- class TestAlarmDecoder(TestCase):
- def setUp(self):
- self._panicked = False
- self._relay_changed = False
- self._power_changed = False
- self._alarmed = False
- self._bypassed = False
- self._battery = False
- self._fire = False
- self._armed = False
- self._got_config = False
- self._message_received = False
- self._rfx_message_received = False
- self._lrr_message_received = False
-
- self._device = Mock(spec=USBDevice)
- self._device.on_open = EventHandler(Event(), self._device)
- self._device.on_close = EventHandler(Event(), self._device)
- self._device.on_read = EventHandler(Event(), self._device)
- self._device.on_write = EventHandler(Event(), self._device)
-
- self._decoder = AlarmDecoder(self._device)
-
- self._decoder._zonetracker = Mock(spec=Zonetracker)
- self._decoder._zonetracker.on_fault = EventHandler(Event(), self._decoder._zonetracker)
- self._decoder._zonetracker.on_restore = EventHandler(Event(), self._decoder._zonetracker)
-
- self._decoder.on_panic += self.on_panic
- self._decoder.on_relay_changed += self.on_relay_changed
- self._decoder.on_power_changed += self.on_power_changed
- self._decoder.on_alarm += self.on_alarm
- self._decoder.on_bypass += self.on_bypass
- self._decoder.on_low_battery += self.on_battery
- self._decoder.on_fire += self.on_fire
- self._decoder.on_arm += self.on_arm
- self._decoder.on_disarm += self.on_disarm
- self._decoder.on_config_received += self.on_config
- self._decoder.on_message += self.on_message
- self._decoder.on_rfx_message += self.on_rfx_message
- self._decoder.on_lrr_message += self.on_lrr_message
-
- self._decoder.address_mask = int('ffffffff', 16)
- self._decoder.open()
-
- def tearDown(self):
- pass
-
- def on_panic(self, sender, *args, **kwargs):
- self._panicked = kwargs['status']
-
- def on_relay_changed(self, sender, *args, **kwargs):
- self._relay_changed = True
-
- def on_power_changed(self, sender, *args, **kwargs):
- self._power_changed = kwargs['status']
-
- def on_alarm(self, sender, *args, **kwargs):
- self._alarmed = kwargs['status']
-
- def on_bypass(self, sender, *args, **kwargs):
- self._bypassed = kwargs['status']
-
- def on_battery(self, sender, *args, **kwargs):
- self._battery = kwargs['status']
-
- def on_fire(self, sender, *args, **kwargs):
- self._fire = kwargs['status']
-
- def on_arm(self, sender, *args, **kwargs):
- self._armed = True
-
- def on_disarm(self, sender, *args, **kwargs):
- self._armed = False
-
- def on_config(self, sender, *args, **kwargs):
- self._got_config = True
-
- def on_message(self, sender, *args, **kwargs):
- self._message_received = True
-
- def on_rfx_message(self, sender, *args, **kwargs):
- self._rfx_message_received = True
-
- def on_lrr_message(self, sender, *args, **kwargs):
- self._lrr_message_received = True
-
- def test_open(self):
- self._decoder.open()
- self._device.open.assert_any_calls()
-
- def test_close(self):
- self._decoder.open()
-
- self._decoder.close()
- self._device.close.assert_any_calls()
-
- def test_send(self):
- self._decoder.send('test')
- self._device.write.assert_called_with('test')
-
- def test_get_config(self):
- self._decoder.get_config()
- self._device.write.assert_called_with("C\r")
-
- def test_save_config(self):
- self._decoder.save_config()
- self._device.write.assert_any_calls()
-
- def test_reboot(self):
- self._decoder.reboot()
- self._device.write.assert_called_with('=')
-
- def test_fault(self):
- self._decoder.fault_zone(1)
- self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1))
-
- def test_fault_wireproblem(self):
- self._decoder.fault_zone(1, simulate_wire_problem=True)
- self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2))
-
- def test_clear_zone(self):
- self._decoder.clear_zone(1)
- self._device.write.assert_called_with("L{0:02}0\r".format(1))
-
- def test_message(self):
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertIsInstance(msg, Message)
-
- self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertTrue(self._message_received)
-
- def test_message_kpe(self):
- msg = self._decoder._handle_message('!KPE:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertIsInstance(msg, Message)
-
- self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertTrue(self._message_received)
-
- def test_expander_message(self):
- msg = self._decoder._handle_message('!EXP:07,01,01')
- self.assertIsInstance(msg, ExpanderMessage)
-
- def test_relay_message(self):
- self._decoder.open()
- msg = self._decoder._handle_message('!REL:12,01,01')
- self.assertIsInstance(msg, ExpanderMessage)
- self.assertEquals(self._relay_changed, True)
-
- def test_rfx_message(self):
- msg = self._decoder._handle_message('!RFX:0180036,80')
- self.assertIsInstance(msg, RFMessage)
- self.assertTrue(self._rfx_message_received)
-
- def test_panic(self):
- self._decoder.open()
-
- msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC')
- self.assertEquals(self._panicked, True)
-
- msg = self._decoder._handle_message('!LRR:012,1,CANCEL')
- self.assertEquals(self._panicked, False)
- self.assertIsInstance(msg, LRRMessage)
-
- def test_config_message(self):
- self._decoder.open()
-
- msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
- self.assertEquals(self._decoder.address, 18)
- self.assertEquals(self._decoder.configbits, int('ff00', 16))
- self.assertEquals(self._decoder.address_mask, int('ffffffff', 16))
- self.assertEquals(self._decoder.emulate_zone, [False for x in range(5)])
- self.assertEquals(self._decoder.emulate_relay, [False for x in range(4)])
- self.assertEquals(self._decoder.emulate_lrr, False)
- self.assertEquals(self._decoder.deduplicate, False)
-
- self.assertEquals(self._got_config, True)
-
- def test_power_changed_event(self):
- msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._power_changed, False) # Not set first time we hit it.
-
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._power_changed, False)
-
- msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._power_changed, True)
-
- def test_alarm_event(self):
- msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._alarmed, False) # Not set first time we hit it.
-
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._alarmed, False)
-
- msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._alarmed, True)
-
- def test_zone_bypassed_event(self):
- msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._bypassed, False) # Not set first time we hit it.
-
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._bypassed, False)
-
- msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._bypassed, True)
-
- def test_armed_away_event(self):
- msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, False) # Not set first time we hit it.
-
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, False)
-
- msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, True)
-
- self._armed = False
-
- msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, False) # Not set first time we hit it.
-
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, False)
-
- msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._armed, True)
-
- def test_battery_low_event(self):
- msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._battery, True)
-
- # force the timeout to expire.
- with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._battery, False)
-
- def test_fire_alarm_event(self):
- msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._fire, True)
-
- # force the timeout to expire.
- with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self.assertEquals(self._fire, False)
-
- def test_hit_for_faults(self):
- self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
-
- self._decoder._device.write.assert_called_with('*')
-
- def test_zonetracker_update(self):
- msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
- self._decoder._zonetracker.update.assert_called_with(msg)
|