diff --git a/pyad2/tests/__init__.py b/pyad2/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyad2/tests/test_ad2.py b/pyad2/tests/test_ad2.py new file mode 100644 index 0000000..26288c7 --- /dev/null +++ b/pyad2/tests/test_ad2.py @@ -0,0 +1,323 @@ +import time + +from unittest import TestCase +from mock import Mock, MagicMock, patch + +from ..ad2 import Overseer, AD2 +from ..devices import USBDevice +from ..messages import Message, RFMessage, LRRMessage, ExpanderMessage +from ..event.event import Event, EventHandler +from ..zonetracking import Zonetracker + +class TestOverseer(TestCase): + def setUp(self): + self._attached = False + self._detached = False + + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): + self._overseer = Overseer() + + def tearDown(self): + self._overseer.stop() + + def attached_event(self, sender, args): + self._attached = True + + def detached_event(self, sender, args): + self._detached = True + + def test_find_all(self): + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): + devices = Overseer.find_all() + + self.assertEquals(devices[0][2], 'AD2') + + def test_create_default_param(self): + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): + device = Overseer.create() + + self.assertEquals(device._device.interface, ('AD2', 0)) + + def test_create_with_param(self): + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + device = Overseer.create((0, 0, 'AD2-1', 1, 'AD2')) + self.assertEquals(device._device.interface, ('AD2-1', 0)) + + device = Overseer.create((0, 0, 'AD2-2', 1, 'AD2')) + self.assertEquals(device._device.interface, ('AD2-2', 0)) + + def test_events(self): + self.assertEquals(self._attached, False) + self.assertEquals(self._detached, False) + + # this is ugly, but it works. + self._overseer.stop() + self._overseer._detect_thread = Overseer.DetectThread(self._overseer) + self._overseer.on_attached += self.attached_event + self._overseer.on_detached += self.detached_event + + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + self._overseer.start() + + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]): + Overseer.find_all() + time.sleep(1) + self._overseer.stop() + + self.assertEquals(self._attached, True) + self.assertEquals(self._detached, True) + +class TestAD2(TestCase): + def setUp(self): + self._panicked = False + self._relay_changed = False + self._power_changed = False + self._alarmed = False + self._bypassed = False + self._battery = (False, 0) + self._fire = (False, 0) + self._armed = False + self._got_config = False + self._message_received = False + self._rfx_message_received = False + self._lrr_message_received = False + + self._device = Mock(spec=USBDevice) + self._device.on_open = EventHandler(Event(), self._device) + self._device.on_close = EventHandler(Event(), self._device) + self._device.on_read = EventHandler(Event(), self._device) + self._device.on_write = EventHandler(Event(), self._device) + + self._ad2 = AD2(self._device) + + self._ad2._zonetracker = Mock(spec=Zonetracker) + self._ad2._zonetracker.on_fault = EventHandler(Event(), self._ad2._zonetracker) + self._ad2._zonetracker.on_restore = EventHandler(Event(), self._ad2._zonetracker) + + self._ad2.on_panic += self.on_panic + self._ad2.on_relay_changed += self.on_relay_changed + self._ad2.on_power_changed += self.on_power_changed + self._ad2.on_alarm += self.on_alarm + self._ad2.on_bypass += self.on_bypass + self._ad2.on_low_battery += self.on_battery + self._ad2.on_fire += self.on_fire + self._ad2.on_arm += self.on_arm + self._ad2.on_disarm += self.on_disarm + self._ad2.on_config_received += self.on_config + self._ad2.on_message += self.on_message + self._ad2.on_rfx_message += self.on_rfx_message + self._ad2.on_lrr_message += self.on_lrr_message + + self._ad2.address_mask = int('ffffffff', 16) + self._ad2.open() + + def tearDown(self): + pass + + def on_panic(self, sender, args): + self._panicked = args + + def on_relay_changed(self, sender, args): + self._relay_changed = True + + def on_power_changed(self, sender, args): + self._power_changed = args + + def on_alarm(self, sender, args): + self._alarmed = args + + def on_bypass(self, sender, args): + self._bypassed = args + + def on_battery(self, sender, args): + self._battery = args + + def on_fire(self, sender, args): + self._fire = args + + def on_arm(self, sender, args): + self._armed = True + + def on_disarm(self, sender, args): + self._armed = False + + def on_config(self, sender, args): + self._got_config = True + + def on_message(self, sender, args): + self._message_received = True + + def on_rfx_message(self, sender, args): + self._rfx_message_received = True + + def on_lrr_message(self, sender, args): + self._lrr_message_received = True + + def test_open(self): + self._ad2.open() + self._device.open.assert_any_calls() + + def test_close(self): + self._ad2.open() + + self._ad2.close() + self._device.close.assert_any_calls() + + def test_send(self): + self._ad2.send('test') + self._device.write.assert_called_with('test') + + def test_get_config(self): + self._ad2.get_config() + self._device.write.assert_called_with("C\r") + + def test_save_config(self): + self._ad2.save_config() + self._device.write.assert_any_calls() + + def test_reboot(self): + self._ad2.reboot() + self._device.write.assert_called_with('=') + + def test_fault(self): + self._ad2.fault_zone(1) + self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1)) + + def test_fault_wireproblem(self): + self._ad2.fault_zone(1, simulate_wire_problem=True) + self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2)) + + def test_clear_zone(self): + self._ad2.clear_zone(1) + self._device.write.assert_called_with("L{0:02}0\r".format(1)) + + def test_message(self): + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertIsInstance(msg, Message) + + self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertTrue(self._message_received) + + def test_message_kpe(self): + msg = self._ad2._handle_message('!KPE:[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertIsInstance(msg, Message) + + self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertTrue(self._message_received) + + def test_expander_message(self): + msg = self._ad2._handle_message('!EXP:07,01,01') + self.assertIsInstance(msg, ExpanderMessage) + + def test_relay_message(self): + self._ad2.open() + msg = self._ad2._handle_message('!REL:12,01,01') + self.assertIsInstance(msg, ExpanderMessage) + self.assertEquals(self._relay_changed, True) + + def test_rfx_message(self): + msg = self._ad2._handle_message('!RFX:0180036,80') + self.assertIsInstance(msg, RFMessage) + self.assertTrue(self._rfx_message_received) + + def test_panic(self): + self._ad2.open() + + msg = self._ad2._handle_message('!LRR:012,1,ALARM_PANIC') + self.assertEquals(self._panicked, True) + + msg = self._ad2._handle_message('!LRR:012,1,CANCEL') + self.assertEquals(self._panicked, False) + self.assertIsInstance(msg, LRRMessage) + + def test_config_message(self): + self._ad2.open() + + msg = self._ad2._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N') + self.assertEquals(self._ad2.address, 18) + self.assertEquals(self._ad2.configbits, int('ff00', 16)) + self.assertEquals(self._ad2.address_mask, int('ffffffff', 16)) + self.assertEquals(self._ad2.emulate_zone, [False for x in range(5)]) + self.assertEquals(self._ad2.emulate_relay, [False for x in range(4)]) + self.assertEquals(self._ad2.emulate_lrr, False) + self.assertEquals(self._ad2.deduplicate, False) + + self.assertEquals(self._got_config, True) + + def test_power_changed_event(self): + msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._power_changed, False) # Not set first time we hit it. + + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._power_changed, False) + + msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._power_changed, True) + + def test_alarm_event(self): + msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._alarmed, False) # Not set first time we hit it. + + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._alarmed, False) + + msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._alarmed, True) + + def test_zone_bypassed_event(self): + msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._bypassed, False) # Not set first time we hit it. + + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._bypassed, False) + + msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._bypassed, True) + + def test_armed_away_event(self): + msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, False) # Not set first time we hit it. + + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, False) + + msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, True) + + self._armed = False + + msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, False) # Not set first time we hit it. + + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, False) + + msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._armed, True) + + def test_battery_low_event(self): + msg = self._ad2._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._battery[0], True) + + # force the timeout to expire. + with patch.object(time, 'time', return_value=self._battery[1] + 35): + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._battery[0], False) + + def test_fire_alarm_event(self): + msg = self._ad2._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._fire[0], True) + + # force the timeout to expire. + with patch.object(time, 'time', return_value=self._fire[1] + 35): + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self.assertEquals(self._fire[0], False) + + def test_hit_for_faults(self): + self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "') + + self._ad2._device.write.assert_called_with('*') + + def test_zonetracker_update(self): + msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self._ad2._zonetracker.update.assert_called_with(msg) diff --git a/pyad2/tests/test_devices.py b/pyad2/tests/test_devices.py new file mode 100644 index 0000000..cfcf21d --- /dev/null +++ b/pyad2/tests/test_devices.py @@ -0,0 +1,315 @@ +from unittest import TestCase +from mock import Mock, MagicMock, patch +from serial import Serial, SerialException +from pyftdi.pyftdi.ftdi import Ftdi, FtdiError +from usb.core import USBError, Device as USBCoreDevice +import socket +from OpenSSL import SSL, crypto +from ..devices import USBDevice, SerialDevice, SocketDevice +from ..util import NoDeviceError, CommError, TimeoutError + + +class TestUSBDevice(TestCase): + def setUp(self): + self._device = USBDevice() + self._device._device = Mock(spec=Ftdi) + self._device._device.usb_dev = Mock(spec=USBCoreDevice) + self._device._device.usb_dev.bus = 0 + self._device._device.usb_dev.address = 0 + + def tearDown(self): + self._device.close() + + def test_find_all(self): + with patch.object(USBDevice, 'find_all', return_value=[]) as mock: + devices = USBDevice.find_all() + + self.assertEquals(devices, []) + + def test_find_all_exception(self): + with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock: + with self.assertRaises(CommError): + devices = USBDevice.find_all() + + with self.assertRaises(CommError): + devices = USBDevice.find_all() + + def test_open(self): + self._device.interface = ('AD2USB', 0) + + with patch.object(self._device._device, 'open') as mock: + self._device.open(no_reader_thread=True) + + mock.assert_any_calls() + + def test_open_failed(self): + self._device.interface = ('AD2USB', 0) + + with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + def test_write(self): + self._device.interface = ('AD2USB', 0) + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'write_data') as mock: + self._device.write('test') + + mock.assert_called_with('test') + + def test_write_exception(self): + with patch.object(self._device._device, 'write_data', side_effect=FtdiError): + with self.assertRaises(CommError): + self._device.write('test') + + def test_read(self): + self._device.interface = ('AD2USB', 0) + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'read_data') as mock: + self._device.read() + + mock.assert_called_with(1) + + def test_read_exception(self): + with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(CommError): + self._device.read() + + with self.assertRaises(CommError): + self._device.read() + + def test_read_line(self): + with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")): + ret = None + try: + ret = self._device.read_line() + except StopIteration: + pass + + self.assertEquals(ret, "testing") + + def test_read_line_timeout(self): + with patch.object(self._device._device, 'read_data', return_value='a') as mock: + with self.assertRaises(TimeoutError): + self._device.read_line(timeout=0.1) + + self.assertIn('a', self._device._buffer) + + def test_read_line_exception(self): + with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(CommError): + self._device.read_line() + + with self.assertRaises(CommError): + self._device.read_line() + +class TestSerialDevice(TestCase): + def setUp(self): + self._device = SerialDevice() + self._device._device = Mock(spec=Serial) + self._device._device.open = Mock() + + def tearDown(self): + self._device.close() + + def test_open(self): + self._device.interface = '/dev/ttyS0' + + with patch.object(self._device._device, 'open') as mock: + self._device.open(no_reader_thread=True) + + mock.assert_called_with() + + def test_open_no_interface(self): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + self.assertFalse(self._device._running) + + def test_open_failed(self): + self._device.interface = '/dev/ttyS0' + + with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + def test_write(self): + self._device.interface = '/dev/ttyS0' + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'write') as mock: + self._device.write('test') + + mock.assert_called_with('test') + + def test_write_exception(self): + with patch.object(self._device._device, 'write', side_effect=SerialException): + with self.assertRaises(CommError): + self._device.write('test') + + def test_read(self): + self._device.interface = '/dev/ttyS0' + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'read') as mock: + self._device.read() + + mock.assert_called_with(1) + + def test_read_exception(self): + with patch.object(self._device._device, 'read', side_effect=SerialException): + with self.assertRaises(CommError): + self._device.read() + + def test_read_line(self): + with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")): + ret = None + try: + ret = self._device.read_line() + except StopIteration: + pass + + self.assertEquals(ret, "testing") + + def test_read_line_timeout(self): + with patch.object(self._device._device, 'read', return_value='a') as mock: + with self.assertRaises(TimeoutError): + self._device.read_line(timeout=0.1) + + self.assertIn('a', self._device._buffer) + + def test_read_line_exception(self): + with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]): + with self.assertRaises(CommError): + self._device.read_line() + + with self.assertRaises(CommError): + self._device.read_line() + +class TestSocketDevice(TestCase): + def setUp(self): + self._device = SocketDevice() + self._device._device = Mock(spec=socket.socket) + + def tearDown(self): + self._device.close() + + def test_open(self): + with patch.object(socket.socket, '__init__', return_value=None): + with patch.object(socket.socket, 'connect', return_value=None) as mock: + self._device.open(no_reader_thread=True) + + mock.assert_called_with(self._device.interface) + + def test_open_no_interface(self): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + self.assertFalse(self._device._running) + + def test_open_failed(self): + with patch.object(self._device._device, 'connect', side_effect=socket.error): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + def test_write(self): + with patch.object(socket.socket, '__init__', return_value=None): + with patch.object(socket.socket, 'connect', return_value=None): + self._device.open(no_reader_thread=True) + + with patch.object(socket.socket, 'send') as mock: + self._device.write('test') + + mock.assert_called_with('test') + + def test_write_exception(self): + with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]): + with self.assertRaises(CommError): + self._device.write('test') + + def test_read(self): + with patch.object(socket.socket, '__init__', return_value=None): + with patch.object(socket.socket, 'connect', return_value=None): + self._device.open(no_reader_thread=True) + + with patch.object(socket.socket, 'recv') as mock: + self._device.read() + + mock.assert_called_with(1) + + def test_read_exception(self): + with patch.object(self._device._device, 'recv', side_effect=socket.error): + with self.assertRaises(CommError): + self._device.read() + + def test_read_line(self): + with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")): + ret = None + try: + ret = self._device.read_line() + except StopIteration: + pass + + self.assertEquals(ret, "testing") + + def test_read_line_timeout(self): + with patch.object(self._device._device, 'recv', return_value='a') as mock: + with self.assertRaises(TimeoutError): + self._device.read_line(timeout=0.1) + + self.assertIn('a', self._device._buffer) + + def test_read_line_exception(self): + with patch.object(self._device._device, 'recv', side_effect=socket.error): + with self.assertRaises(CommError): + self._device.read_line() + + with self.assertRaises(CommError): + self._device.read_line() + + def test_ssl(self): + ssl_key = crypto.PKey() + ssl_key.generate_key(crypto.TYPE_RSA, 2048) + ssl_cert = crypto.X509() + ssl_cert.set_pubkey(ssl_key) + ssl_ca_key = crypto.PKey() + ssl_ca_key.generate_key(crypto.TYPE_RSA, 2048) + ssl_ca_cert = crypto.X509() + ssl_ca_cert.set_pubkey(ssl_ca_key) + + self._device.ssl = True + self._device.ssl_key = ssl_key + self._device.ssl_certificate = ssl_cert + self._device.ssl_ca = ssl_ca_cert + + # ..there has to be a better way.. + with patch.object(socket.socket, '__init__', return_value=None): + with patch.object(socket.socket, 'connect', return_value=None) as mock: + with patch.object(socket.socket, '_sock'): + with patch.object(socket.socket, 'fileno', return_value=1): + self._device.open(no_reader_thread=True) + + mock.assert_called_with(self._device.interface) + self.assertIsInstance(self._device._device, SSL.Connection) + + def test_ssl_exception(self): + self._device.ssl = True + self._device.ssl_key = 'None' + self._device.ssl_certificate = 'None' + self._device.ssl_ca = 'None' + + # ..there has to be a better way.. + with patch.object(socket.socket, '__init__', return_value=None): + with patch.object(socket.socket, 'connect', return_value=None) as mock: + with patch.object(socket.socket, '_sock'): + with patch.object(socket.socket, 'fileno', return_value=1): + with self.assertRaises(CommError): + self._device.open(no_reader_thread=True) diff --git a/pyad2/tests/test_messages.py b/pyad2/tests/test_messages.py new file mode 100644 index 0000000..5d7a7d1 --- /dev/null +++ b/pyad2/tests/test_messages.py @@ -0,0 +1,47 @@ +from unittest import TestCase + +from ..messages import Message, ExpanderMessage, RFMessage, LRRMessage +from ..util import InvalidMessageError + +class TestMessages(TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_message_parse(self): + msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "') + + self.assertEquals(msg.numeric_code, '001') + + def test_message_parse_fail(self): + with self.assertRaises(InvalidMessageError): + msg = Message('') + + def test_expander_message_parse(self): + msg = ExpanderMessage('!EXP:07,01,01') + + self.assertEquals(msg.address, 7) + + def test_expander_message_parse_fail(self): + with self.assertRaises(InvalidMessageError): + msg = ExpanderMessage('') + + def test_rf_message_parse(self): + msg = RFMessage('!RFX:0180036,80') + + self.assertEquals(msg.serial_number, '0180036') + + def test_rf_message_parse_fail(self): + with self.assertRaises(InvalidMessageError): + msg = RFMessage('') + + def test_lrr_message_parse(self): + msg = LRRMessage('!LRR:012,1,ARM_STAY') + + self.assertEquals(msg.event_type, 'ARM_STAY') + + def test_lrr_message_parse_fail(self): + with self.assertRaises(InvalidMessageError): + msg = LRRMessage('') diff --git a/pyad2/tests/test_util.py b/pyad2/tests/test_util.py new file mode 100644 index 0000000..e69de29 diff --git a/pyad2/tests/test_zonetracking.py b/pyad2/tests/test_zonetracking.py new file mode 100644 index 0000000..32ecb2c --- /dev/null +++ b/pyad2/tests/test_zonetracking.py @@ -0,0 +1,144 @@ +from unittest import TestCase +from mock import Mock, MagicMock + +from ..messages import Message, ExpanderMessage +from ..zonetracking import Zonetracker, Zone + +class TestZonetracking(TestCase): + def setUp(self): + self._zonetracker = Zonetracker() + + self._zonetracker.on_fault += self.fault_event + self._zonetracker.on_restore += self.restore_event + + self._faulted = False + self._restored = False + + def tearDown(self): + pass + + def fault_event(self, sender, args): + self._faulted = True + + def restore_event(self, sender, args): + self._restored = True + + def _build_expander_message(self, msg): + msg = ExpanderMessage(msg) + zone = self._zonetracker._expander_to_zone(msg.address, msg.channel) + + return zone, msg + + def test_zone_fault(self): + zone, msg = self._build_expander_message('!EXP:07,01,01') + self._zonetracker.update(msg) + + self.assertEquals(self._zonetracker._zones[zone].status, Zone.FAULT) + self.assertTrue(self._faulted) + + def test_zone_restore(self): + zone, msg = self._build_expander_message('!EXP:07,01,01') + self._zonetracker.update(msg) + + zone, msg = self._build_expander_message('!EXP:07,01,00') + self._zonetracker.update(msg) + + self.assertEquals(self._zonetracker._zones[zone].status, Zone.CLEAR) + self.assertTrue(self._restored) + + def test_message_ready(self): + msg = Message('[0000000000000010----],001,[f707000600e5800c0c020000]," "') + self._zonetracker.update(msg) + + self.assertEquals(len(self._zonetracker._zones_faulted), 1) + + msg = Message('[1000000000000000----],000,[f707000600e5800c0c020000]," "') + self._zonetracker.update(msg) + + self.assertEquals(len(self._zonetracker._zones_faulted), 0) + + def test_message_fault_text(self): + msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "') + self._zonetracker.update(msg) + + self.assertEquals(len(self._zonetracker._zones_faulted), 1) + + def test_ECP_failure(self): + msg = Message('[0000000000000010----],0bf,[f707000600e5800c0c020000],"CHECK 1 "') + self._zonetracker.update(msg) + + self.assertEquals(self._zonetracker._zones['1'].status, Zone.CHECK) + + def test_zone_restore_skip(self): + panel_messages = [ + '[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "', + '[0000000000000000----],002,[f707000600e5800c0c020000],"FAULT 2 "', + '[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "', + '[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "' + ] + + for m in panel_messages: + msg = Message(m) + + self._zonetracker.update(msg) + + self.assertIn(1, self._zonetracker._zones_faulted) + self.assertNotIn(2, self._zonetracker._zones_faulted) + + def test_zone_out_of_order_fault(self): + panel_messages = [ + '[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + '[0000000000000010----],003,[f707000600e5800c0c020000],"FAULT 3 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + ] + + for m in panel_messages: + msg = Message(m) + + self._zonetracker.update(msg) + + self.assertIn(1, self._zonetracker._zones_faulted) + self.assertIn(3, self._zonetracker._zones_faulted) + self.assertIn(4, self._zonetracker._zones_faulted) + + def test_zone_multi_zone_skip_restore(self): + panel_messages = [ + '[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + '[0000000000000010----],002,[f707000600e5800c0c020000],"FAULT 2 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + ] + + for m in panel_messages: + msg = Message(m) + + self._zonetracker.update(msg) + + self.assertNotIn(1, self._zonetracker._zones_faulted) + self.assertNotIn(2, self._zonetracker._zones_faulted) + self.assertIn(4, self._zonetracker._zones_faulted) + + def test_zone_timeout_restore(self): + panel_messages = [ + '[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + '[0000000000000010----],002,[f707000600e5800c0c020000],"FAULT 2 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + '[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "', + ] + + for m in panel_messages: + msg = Message(m) + + self._zonetracker.update(msg) + + self.assertIn(4, self._zonetracker._zones_faulted) + self._zonetracker._zones[4].timestamp -= 35 # forcefully expire the zone + + # generic message to force an update. + msg = Message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self._zonetracker.update(msg) + + self.assertNotIn(4, self._zonetracker._zones_faulted) diff --git a/setup.py b/setup.py index d51ce6d..c071936 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,11 @@ setup(name='pyad2', license='', packages=['pyad2'], install_requires=[ - 'OpenSSL', + 'pyopenssl', 'pyftdi' ], + test_suite='nose.collector', + tests_require=['nose', 'mock'], scripts=['bin/ad2-sslterm', 'bin/ad2-firmwareupload'], include_package_data=True, zip_safe=False)