Browse Source

Added unit tests.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
6cc390c0c1
7 changed files with 832 additions and 1 deletions
  1. +0
    -0
      pyad2/tests/__init__.py
  2. +323
    -0
      pyad2/tests/test_ad2.py
  3. +315
    -0
      pyad2/tests/test_devices.py
  4. +47
    -0
      pyad2/tests/test_messages.py
  5. +0
    -0
      pyad2/tests/test_util.py
  6. +144
    -0
      pyad2/tests/test_zonetracking.py
  7. +3
    -1
      setup.py

+ 0
- 0
pyad2/tests/__init__.py View File


+ 323
- 0
pyad2/tests/test_ad2.py View File

@@ -0,0 +1,323 @@
import time

from unittest import TestCase
from mock import Mock, MagicMock, patch

from ..ad2 import Overseer, AD2
from ..devices import USBDevice
from ..messages import Message, RFMessage, LRRMessage, ExpanderMessage
from ..event.event import Event, EventHandler
from ..zonetracking import Zonetracker

class TestOverseer(TestCase):
def setUp(self):
self._attached = False
self._detached = False

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
self._overseer = Overseer()

def tearDown(self):
self._overseer.stop()

def attached_event(self, sender, args):
self._attached = True

def detached_event(self, sender, args):
self._detached = True

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
devices = Overseer.find_all()

self.assertEquals(devices[0][2], 'AD2')

def test_create_default_param(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = Overseer.create()

self.assertEquals(device._device.interface, ('AD2', 0))

def test_create_with_param(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = Overseer.create((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device._device.interface, ('AD2-1', 0))

device = Overseer.create((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device._device.interface, ('AD2-2', 0))

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
self._overseer.stop()
self._overseer._detect_thread = Overseer.DetectThread(self._overseer)
self._overseer.on_attached += self.attached_event
self._overseer.on_detached += self.detached_event

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
self._overseer.start()

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
Overseer.find_all()
time.sleep(1)
self._overseer.stop()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

class TestAD2(TestCase):
def setUp(self):
self._panicked = False
self._relay_changed = False
self._power_changed = False
self._alarmed = False
self._bypassed = False
self._battery = (False, 0)
self._fire = (False, 0)
self._armed = False
self._got_config = False
self._message_received = False
self._rfx_message_received = False
self._lrr_message_received = False

self._device = Mock(spec=USBDevice)
self._device.on_open = EventHandler(Event(), self._device)
self._device.on_close = EventHandler(Event(), self._device)
self._device.on_read = EventHandler(Event(), self._device)
self._device.on_write = EventHandler(Event(), self._device)

self._ad2 = AD2(self._device)

self._ad2._zonetracker = Mock(spec=Zonetracker)
self._ad2._zonetracker.on_fault = EventHandler(Event(), self._ad2._zonetracker)
self._ad2._zonetracker.on_restore = EventHandler(Event(), self._ad2._zonetracker)

self._ad2.on_panic += self.on_panic
self._ad2.on_relay_changed += self.on_relay_changed
self._ad2.on_power_changed += self.on_power_changed
self._ad2.on_alarm += self.on_alarm
self._ad2.on_bypass += self.on_bypass
self._ad2.on_low_battery += self.on_battery
self._ad2.on_fire += self.on_fire
self._ad2.on_arm += self.on_arm
self._ad2.on_disarm += self.on_disarm
self._ad2.on_config_received += self.on_config
self._ad2.on_message += self.on_message
self._ad2.on_rfx_message += self.on_rfx_message
self._ad2.on_lrr_message += self.on_lrr_message

self._ad2.address_mask = int('ffffffff', 16)
self._ad2.open()

def tearDown(self):
pass

def on_panic(self, sender, args):
self._panicked = args

def on_relay_changed(self, sender, args):
self._relay_changed = True

def on_power_changed(self, sender, args):
self._power_changed = args

def on_alarm(self, sender, args):
self._alarmed = args

def on_bypass(self, sender, args):
self._bypassed = args

def on_battery(self, sender, args):
self._battery = args

def on_fire(self, sender, args):
self._fire = args

def on_arm(self, sender, args):
self._armed = True

def on_disarm(self, sender, args):
self._armed = False

def on_config(self, sender, args):
self._got_config = True

def on_message(self, sender, args):
self._message_received = True

def on_rfx_message(self, sender, args):
self._rfx_message_received = True

def on_lrr_message(self, sender, args):
self._lrr_message_received = True

def test_open(self):
self._ad2.open()
self._device.open.assert_any_calls()

def test_close(self):
self._ad2.open()

self._ad2.close()
self._device.close.assert_any_calls()

def test_send(self):
self._ad2.send('test')
self._device.write.assert_called_with('test')

def test_get_config(self):
self._ad2.get_config()
self._device.write.assert_called_with("C\r")

def test_save_config(self):
self._ad2.save_config()
self._device.write.assert_any_calls()

def test_reboot(self):
self._ad2.reboot()
self._device.write.assert_called_with('=')

def test_fault(self):
self._ad2.fault_zone(1)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1))

def test_fault_wireproblem(self):
self._ad2.fault_zone(1, simulate_wire_problem=True)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2))

def test_clear_zone(self):
self._ad2.clear_zone(1)
self._device.write.assert_called_with("L{0:02}0\r".format(1))

def test_message(self):
msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_message_kpe(self):
msg = self._ad2._handle_message('!KPE:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_expander_message(self):
msg = self._ad2._handle_message('!EXP:07,01,01')
self.assertIsInstance(msg, ExpanderMessage)

def test_relay_message(self):
self._ad2.open()
msg = self._ad2._handle_message('!REL:12,01,01')
self.assertIsInstance(msg, ExpanderMessage)
self.assertEquals(self._relay_changed, True)

def test_rfx_message(self):
msg = self._ad2._handle_message('!RFX:0180036,80')
self.assertIsInstance(msg, RFMessage)
self.assertTrue(self._rfx_message_received)

def test_panic(self):
self._ad2.open()

msg = self._ad2._handle_message('!LRR:012,1,ALARM_PANIC')
self.assertEquals(self._panicked, True)

msg = self._ad2._handle_message('!LRR:012,1,CANCEL')
self.assertEquals(self._panicked, False)
self.assertIsInstance(msg, LRRMessage)

def test_config_message(self):
self._ad2.open()

msg = self._ad2._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEquals(self._ad2.address, 18)
self.assertEquals(self._ad2.configbits, int('ff00', 16))
self.assertEquals(self._ad2.address_mask, int('ffffffff', 16))
self.assertEquals(self._ad2.emulate_zone, [False for x in range(5)])
self.assertEquals(self._ad2.emulate_relay, [False for x in range(4)])
self.assertEquals(self._ad2.emulate_lrr, False)
self.assertEquals(self._ad2.deduplicate, False)

self.assertEquals(self._got_config, True)

def test_power_changed_event(self):
msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False) # Not set first time we hit it.

msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False)

msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, True)

def test_alarm_event(self):
msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False) # Not set first time we hit it.

msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False)

msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, True)

def test_zone_bypassed_event(self):
msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False) # Not set first time we hit it.

msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False)

msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, True)

def test_armed_away_event(self):
msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

self._armed = False

msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

def test_battery_low_event(self):
msg = self._ad2._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery[0], True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._battery[1] + 35):
msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery[0], False)

def test_fire_alarm_event(self):
msg = self._ad2._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire[0], True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._fire[1] + 35):
msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire[0], False)

def test_hit_for_faults(self):
self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')

self._ad2._device.write.assert_called_with('*')

def test_zonetracker_update(self):
msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._ad2._zonetracker.update.assert_called_with(msg)

+ 315
- 0
pyad2/tests/test_devices.py View File

@@ -0,0 +1,315 @@
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice
import socket
from OpenSSL import SSL, crypto
from ..devices import USBDevice, SerialDevice, SocketDevice
from ..util import NoDeviceError, CommError, TimeoutError


class TestUSBDevice(TestCase):
def setUp(self):
self._device = USBDevice()
self._device._device = Mock(spec=Ftdi)
self._device._device.usb_dev = Mock(spec=USBCoreDevice)
self._device._device.usb_dev.bus = 0
self._device._device.usb_dev.address = 0

def tearDown(self):
self._device.close()

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all()

self.assertEquals(devices, [])

def test_find_all_exception(self):
with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
with self.assertRaises(CommError):
devices = USBDevice.find_all()

with self.assertRaises(CommError):
devices = USBDevice.find_all()

def test_open(self):
self._device.interface = ('AD2USB', 0)

with patch.object(self._device._device, 'open') as mock:
self._device.open(no_reader_thread=True)

mock.assert_any_calls()

def test_open_failed(self):
self._device.interface = ('AD2USB', 0)

with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
self._device.interface = ('AD2USB', 0)
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write_data') as mock:
self._device.write('test')

mock.assert_called_with('test')

def test_write_exception(self):
with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
with self.assertRaises(CommError):
self._device.write('test')

def test_read(self):
self._device.interface = ('AD2USB', 0)
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read_data') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read()

with self.assertRaises(CommError):
self._device.read()

def test_read_line(self):
with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, "testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)

def test_read_line_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read_line()

with self.assertRaises(CommError):
self._device.read_line()

class TestSerialDevice(TestCase):
def setUp(self):
self._device = SerialDevice()
self._device._device = Mock(spec=Serial)
self._device._device.open = Mock()

def tearDown(self):
self._device.close()

def test_open(self):
self._device.interface = '/dev/ttyS0'

with patch.object(self._device._device, 'open') as mock:
self._device.open(no_reader_thread=True)

mock.assert_called_with()

def test_open_no_interface(self):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

self.assertFalse(self._device._running)

def test_open_failed(self):
self._device.interface = '/dev/ttyS0'

with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
self._device.interface = '/dev/ttyS0'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write') as mock:
self._device.write('test')

mock.assert_called_with('test')

def test_write_exception(self):
with patch.object(self._device._device, 'write', side_effect=SerialException):
with self.assertRaises(CommError):
self._device.write('test')

def test_read(self):
self._device.interface = '/dev/ttyS0'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read', side_effect=SerialException):
with self.assertRaises(CommError):
self._device.read()

def test_read_line(self):
with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, "testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)

def test_read_line_exception(self):
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
with self.assertRaises(CommError):
self._device.read_line()

with self.assertRaises(CommError):
self._device.read_line()

class TestSocketDevice(TestCase):
def setUp(self):
self._device = SocketDevice()
self._device._device = Mock(spec=socket.socket)

def tearDown(self):
self._device.close()

def test_open(self):
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None) as mock:
self._device.open(no_reader_thread=True)

mock.assert_called_with(self._device.interface)

def test_open_no_interface(self):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

self.assertFalse(self._device._running)

def test_open_failed(self):
with patch.object(self._device._device, 'connect', side_effect=socket.error):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None):
self._device.open(no_reader_thread=True)

with patch.object(socket.socket, 'send') as mock:
self._device.write('test')

mock.assert_called_with('test')

def test_write_exception(self):
with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]):
with self.assertRaises(CommError):
self._device.write('test')

def test_read(self):
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None):
self._device.open(no_reader_thread=True)

with patch.object(socket.socket, 'recv') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'recv', side_effect=socket.error):
with self.assertRaises(CommError):
self._device.read()

def test_read_line(self):
with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, "testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'recv', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)

def test_read_line_exception(self):
with patch.object(self._device._device, 'recv', side_effect=socket.error):
with self.assertRaises(CommError):
self._device.read_line()

with self.assertRaises(CommError):
self._device.read_line()

def test_ssl(self):
ssl_key = crypto.PKey()
ssl_key.generate_key(crypto.TYPE_RSA, 2048)
ssl_cert = crypto.X509()
ssl_cert.set_pubkey(ssl_key)
ssl_ca_key = crypto.PKey()
ssl_ca_key.generate_key(crypto.TYPE_RSA, 2048)
ssl_ca_cert = crypto.X509()
ssl_ca_cert.set_pubkey(ssl_ca_key)

self._device.ssl = True
self._device.ssl_key = ssl_key
self._device.ssl_certificate = ssl_cert
self._device.ssl_ca = ssl_ca_cert

# ..there has to be a better way..
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None) as mock:
with patch.object(socket.socket, '_sock'):
with patch.object(socket.socket, 'fileno', return_value=1):
self._device.open(no_reader_thread=True)

mock.assert_called_with(self._device.interface)
self.assertIsInstance(self._device._device, SSL.Connection)

def test_ssl_exception(self):
self._device.ssl = True
self._device.ssl_key = 'None'
self._device.ssl_certificate = 'None'
self._device.ssl_ca = 'None'

# ..there has to be a better way..
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None) as mock:
with patch.object(socket.socket, '_sock'):
with patch.object(socket.socket, 'fileno', return_value=1):
with self.assertRaises(CommError):
self._device.open(no_reader_thread=True)

+ 47
- 0
pyad2/tests/test_messages.py View File

@@ -0,0 +1,47 @@
from unittest import TestCase

from ..messages import Message, ExpanderMessage, RFMessage, LRRMessage
from ..util import InvalidMessageError

class TestMessages(TestCase):
def setUp(self):
pass

def tearDown(self):
pass

def test_message_parse(self):
msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "')

self.assertEquals(msg.numeric_code, '001')

def test_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = Message('')

def test_expander_message_parse(self):
msg = ExpanderMessage('!EXP:07,01,01')

self.assertEquals(msg.address, 7)

def test_expander_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = ExpanderMessage('')

def test_rf_message_parse(self):
msg = RFMessage('!RFX:0180036,80')

self.assertEquals(msg.serial_number, '0180036')

def test_rf_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = RFMessage('')

def test_lrr_message_parse(self):
msg = LRRMessage('!LRR:012,1,ARM_STAY')

self.assertEquals(msg.event_type, 'ARM_STAY')

def test_lrr_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = LRRMessage('')

+ 0
- 0
pyad2/tests/test_util.py View File


+ 144
- 0
pyad2/tests/test_zonetracking.py View File

@@ -0,0 +1,144 @@
from unittest import TestCase
from mock import Mock, MagicMock

from ..messages import Message, ExpanderMessage
from ..zonetracking import Zonetracker, Zone

class TestZonetracking(TestCase):
def setUp(self):
self._zonetracker = Zonetracker()

self._zonetracker.on_fault += self.fault_event
self._zonetracker.on_restore += self.restore_event

self._faulted = False
self._restored = False

def tearDown(self):
pass

def fault_event(self, sender, args):
self._faulted = True

def restore_event(self, sender, args):
self._restored = True

def _build_expander_message(self, msg):
msg = ExpanderMessage(msg)
zone = self._zonetracker._expander_to_zone(msg.address, msg.channel)

return zone, msg

def test_zone_fault(self):
zone, msg = self._build_expander_message('!EXP:07,01,01')
self._zonetracker.update(msg)

self.assertEquals(self._zonetracker._zones[zone].status, Zone.FAULT)
self.assertTrue(self._faulted)

def test_zone_restore(self):
zone, msg = self._build_expander_message('!EXP:07,01,01')
self._zonetracker.update(msg)

zone, msg = self._build_expander_message('!EXP:07,01,00')
self._zonetracker.update(msg)

self.assertEquals(self._zonetracker._zones[zone].status, Zone.CLEAR)
self.assertTrue(self._restored)

def test_message_ready(self):
msg = Message('[0000000000000010----],001,[f707000600e5800c0c020000]," "')
self._zonetracker.update(msg)

self.assertEquals(len(self._zonetracker._zones_faulted), 1)

msg = Message('[1000000000000000----],000,[f707000600e5800c0c020000]," "')
self._zonetracker.update(msg)

self.assertEquals(len(self._zonetracker._zones_faulted), 0)

def test_message_fault_text(self):
msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "')
self._zonetracker.update(msg)

self.assertEquals(len(self._zonetracker._zones_faulted), 1)

def test_ECP_failure(self):
msg = Message('[0000000000000010----],0bf,[f707000600e5800c0c020000],"CHECK 1 "')
self._zonetracker.update(msg)

self.assertEquals(self._zonetracker._zones['1'].status, Zone.CHECK)

def test_zone_restore_skip(self):
panel_messages = [
'[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "',
'[0000000000000000----],002,[f707000600e5800c0c020000],"FAULT 2 "',
'[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "',
'[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "'
]

for m in panel_messages:
msg = Message(m)

self._zonetracker.update(msg)

self.assertIn(1, self._zonetracker._zones_faulted)
self.assertNotIn(2, self._zonetracker._zones_faulted)

def test_zone_out_of_order_fault(self):
panel_messages = [
'[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
'[0000000000000010----],003,[f707000600e5800c0c020000],"FAULT 3 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
]

for m in panel_messages:
msg = Message(m)

self._zonetracker.update(msg)

self.assertIn(1, self._zonetracker._zones_faulted)
self.assertIn(3, self._zonetracker._zones_faulted)
self.assertIn(4, self._zonetracker._zones_faulted)

def test_zone_multi_zone_skip_restore(self):
panel_messages = [
'[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
'[0000000000000010----],002,[f707000600e5800c0c020000],"FAULT 2 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
]

for m in panel_messages:
msg = Message(m)

self._zonetracker.update(msg)

self.assertNotIn(1, self._zonetracker._zones_faulted)
self.assertNotIn(2, self._zonetracker._zones_faulted)
self.assertIn(4, self._zonetracker._zones_faulted)

def test_zone_timeout_restore(self):
panel_messages = [
'[0000000000000010----],001,[f707000600e5800c0c020000],"FAULT 1 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
'[0000000000000010----],002,[f707000600e5800c0c020000],"FAULT 2 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
'[0000000000000010----],004,[f707000600e5800c0c020000],"FAULT 4 "',
]

for m in panel_messages:
msg = Message(m)

self._zonetracker.update(msg)

self.assertIn(4, self._zonetracker._zones_faulted)
self._zonetracker._zones[4].timestamp -= 35 # forcefully expire the zone

# generic message to force an update.
msg = Message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._zonetracker.update(msg)

self.assertNotIn(4, self._zonetracker._zones_faulted)

+ 3
- 1
setup.py View File

@@ -21,9 +21,11 @@ setup(name='pyad2',
license='',
packages=['pyad2'],
install_requires=[
'OpenSSL',
'pyopenssl',
'pyftdi'
],
test_suite='nose.collector',
tests_require=['nose', 'mock'],
scripts=['bin/ad2-sslterm', 'bin/ad2-firmwareupload'],
include_package_data=True,
zip_safe=False)

Loading…
Cancel
Save