Browse Source

Merge pull request #19 from nutechsoftware/dev

added on_ready_change event handler and tests
pyserial_fix
Sean Mathews 6 years ago
committed by GitHub
parent
commit
914bf6388f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 69 deletions
  1. +74
    -52
      alarmdecoder/decoder.py
  2. +1
    -1
      setup.py
  3. +23
    -16
      test/test_ad2.py

+ 74
- 52
alarmdecoder/decoder.py View File

@@ -33,6 +33,7 @@ class AlarmDecoder(object):
on_arm = event.Event("This event is called when the panel is armed.\n\n**Callback definition:** *def callback(device, stay)*") on_arm = event.Event("This event is called when the panel is armed.\n\n**Callback definition:** *def callback(device, stay)*")
on_disarm = event.Event("This event is called when the panel is disarmed.\n\n**Callback definition:** *def callback(device)*") on_disarm = event.Event("This event is called when the panel is disarmed.\n\n**Callback definition:** *def callback(device)*")
on_power_changed = event.Event("This event is called when panel power switches between AC and DC.\n\n**Callback definition:** *def callback(device, status)*") on_power_changed = event.Event("This event is called when panel power switches between AC and DC.\n\n**Callback definition:** *def callback(device, status)*")
on_ready_changed = event.Event("This event is called when panel ready state changes.\n\n**Callback definition:** *def callback(device, status)*")
on_alarm = event.Event("This event is called when the alarm is triggered.\n\n**Callback definition:** *def callback(device, zone)*") on_alarm = event.Event("This event is called when the alarm is triggered.\n\n**Callback definition:** *def callback(device, zone)*")
on_alarm_restored = event.Event("This event is called when the alarm stops sounding.\n\n**Callback definition:** *def callback(device, zone)*") on_alarm_restored = event.Event("This event is called when the alarm stops sounding.\n\n**Callback definition:** *def callback(device, zone)*")
on_fire = event.Event("This event is called when a fire is detected.\n\n**Callback definition:** *def callback(device, status)*") on_fire = event.Event("This event is called when a fire is detected.\n\n**Callback definition:** *def callback(device, status)*")
@@ -122,7 +123,7 @@ class AlarmDecoder(object):
version_flags = "" version_flags = ""
"""Device flags enabled""" """Device flags enabled"""


def __init__(self, device, ignore_message_states=False):
def __init__(self, device, ignore_message_states=False, ignore_lrr_states=True):
""" """
Constructor Constructor


@@ -131,23 +132,24 @@ class AlarmDecoder(object):
:type device: Device :type device: Device
:param ignore_message_states: Ignore regular panel messages when updating internal states :param ignore_message_states: Ignore regular panel messages when updating internal states
:type ignore_message_states: bool :type ignore_message_states: bool
:param ignore_lrr_states: Ignore LRR panel messages when updating internal states
:type ignore_lrr_states: bool
""" """
self._device = device self._device = device
self._zonetracker = Zonetracker(self) self._zonetracker = Zonetracker(self)
self._lrr_system = LRRSystem(self) self._lrr_system = LRRSystem(self)


self._ignore_message_states = ignore_message_states self._ignore_message_states = ignore_message_states
self._ignore_lrr_states = ignore_lrr_states
self._battery_timeout = AlarmDecoder.BATTERY_TIMEOUT self._battery_timeout = AlarmDecoder.BATTERY_TIMEOUT
self._fire_timeout = AlarmDecoder.FIRE_TIMEOUT self._fire_timeout = AlarmDecoder.FIRE_TIMEOUT
self._power_status = None self._power_status = None
self._ready_status = None
self._alarm_status = None self._alarm_status = None
self._bypass_status = {} self._bypass_status = {}
self._armed_status = None self._armed_status = None
self._armed_stay = False self._armed_stay = False
self._fire_status = (False, 0)
self._fire_alarming = False
self._fire_alarming_changed = 0
self._fire_state = FireState.NONE
self._fire_status = False
self._battery_status = (False, 0) self._battery_status = (False, 0)
self._panic_status = False self._panic_status = False
self._relay_status = {} self._relay_status = {}
@@ -413,7 +415,10 @@ class AlarmDecoder(object):
:returns: :py:class:`~alarmdecoder.messages.Message` :returns: :py:class:`~alarmdecoder.messages.Message`
""" """


data = data.decode('utf-8')
try:
data = data.decode('utf-8')
except:
raise InvalidMessageError('Decode failed for message: {0}'.format(data))


if data is not None: if data is not None:
data = data.lstrip('\0') data = data.lstrip('\0')
@@ -468,8 +473,6 @@ class AlarmDecoder(object):
if self._internal_address_mask & msg.mask > 0: if self._internal_address_mask & msg.mask > 0:
if not self._ignore_message_states: if not self._ignore_message_states:
self._update_internal_states(msg) self._update_internal_states(msg)
else:
self._update_fire_status(status=None)


self.on_message(message=msg) self.on_message(message=msg)


@@ -517,7 +520,8 @@ class AlarmDecoder(object):
""" """
msg = LRRMessage(data) msg = LRRMessage(data)


self._lrr_system.update(msg)
if not self._ignore_lrr_states:
self._lrr_system.update(msg)
self.on_lrr_message(message=msg) self.on_lrr_message(message=msg)


return msg return msg
@@ -608,10 +612,10 @@ class AlarmDecoder(object):
:type message: :py:class:`~alarmdecoder.messages.Message`, :py:class:`~alarmdecoder.messages.ExpanderMessage`, :py:class:`~alarmdecoder.messages.LRRMessage`, or :py:class:`~alarmdecoder.messages.RFMessage` :type message: :py:class:`~alarmdecoder.messages.Message`, :py:class:`~alarmdecoder.messages.ExpanderMessage`, :py:class:`~alarmdecoder.messages.LRRMessage`, or :py:class:`~alarmdecoder.messages.RFMessage`
""" """
if isinstance(message, Message) and not self._ignore_message_states: if isinstance(message, Message) and not self._ignore_message_states:
self._update_armed_ready_status(message)
self._update_power_status(message) self._update_power_status(message)
self._update_alarm_status(message) self._update_alarm_status(message)
self._update_zone_bypass_status(message) self._update_zone_bypass_status(message)
self._update_armed_status(message)
self._update_battery_status(message) self._update_battery_status(message)
self._update_fire_status(message) self._update_fire_status(message)


@@ -710,6 +714,52 @@ class AlarmDecoder(object):


return bypass_status return bypass_status


def _update_armed_ready_status(self, message=None):
"""
Uses the provided message to update the armed state
and ready state at once as they can change in the same
message and we want both events to have the same states.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`

"""

arm_status = None
stay_status = None
ready_status = None

send_ready = False
send_arm = False

if isinstance(message, Message):
arm_status = message.armed_away
stay_status = message.armed_home
ready_status = message.ready

if arm_status is None or stay_status is None or ready_status is None:
return

self._armed_stay, old_stay = stay_status, self._armed_stay
self._armed_status, old_arm = arm_status, self._armed_status
self._ready_status, old_ready_status = ready_status, self._ready_status

if old_arm is not None:
if arm_status != old_arm or stay_status != old_stay:
send_arm = True

if old_ready_status is not None:
if ready_status != old_ready_status:
send_ready = True

if send_ready:
self.on_ready_changed(status=self._ready_status)

if send_arm:
if self._armed_status or self._armed_stay:
self.on_arm(stay=stay_status)
else:
self.on_disarm()

def _update_armed_status(self, message=None, status=None, status_stay=None): def _update_armed_status(self, message=None, status=None, status_stay=None):
""" """
Uses the provided message to update the armed state. Uses the provided message to update the armed state.
@@ -783,54 +833,26 @@ class AlarmDecoder(object):


:returns: boolean indicating the new status :returns: boolean indicating the new status
""" """
is_lrr = status is not None
fire_status = status fire_status = status
last_status = self._fire_status
if isinstance(message, Message): if isinstance(message, Message):
fire_status = message.fire_alarm

last_status, last_update = self._fire_status

if self._fire_state == FireState.NONE:
# Always move to a FIRE state if detected
if fire_status == True:
self._fire_state = FireState.ALARM
self._fire_status = (fire_status, time.time())

self.on_fire(status=FireState.ALARM)

elif self._fire_state == FireState.ALARM:
# If we've received an LRR CANCEL message, move to ACKNOWLEDGED
if is_lrr and fire_status == False:
self._fire_state = FireState.ACKNOWLEDGED
self._fire_status = (fire_status, time.time())
self.on_fire(status=FireState.ACKNOWLEDGED)
# Quirk in Ademco panels. The fire bit drops on "SYSTEM LO BAT" messages.
# FIXME: does not support non english panels.
if self.mode == ADEMCO and message.text.startswith("SYSTEM"):
fire_status = last_status
else: else:
# Handle bouncing status changes and timeout in order to revert back to NONE.
if last_status != fire_status or fire_status == True:
self._fire_status = (fire_status, time.time())
if fire_status == False and time.time() > last_update + self._fire_timeout:
self._fire_state = FireState.NONE
self.on_fire(status=FireState.NONE)

elif self._fire_state == FireState.ACKNOWLEDGED:
# If we've received a second LRR FIRE message after a CANCEL, revert back to FIRE and trigger another event.
if is_lrr and fire_status == True:
self._fire_state = FireState.ALARM
self._fire_status = (fire_status, time.time())

self.on_fire(status=FireState.ALARM)
else:
# Handle bouncing status changes and timeout in order to revert back to NONE.
if last_status != fire_status or fire_status == True:
self._fire_status = (fire_status, time.time())
fire_status = message.fire_alarm

if fire_status is None:
return


if fire_status != True and time.time() > last_update + self._fire_timeout:
self._fire_state = FireState.NONE
self.on_fire(status=FireState.NONE)
if fire_status != self._fire_status:
self._fire_status, old_status = fire_status, self._fire_status


return self._fire_state == FireState.ALARM
if old_status is not None:
self.on_fire(status=self._fire_status)


return self._fire_status


def _update_panic_status(self, status=None): def _update_panic_status(self, status=None):
""" """


+ 1
- 1
setup.py View File

@@ -14,7 +14,7 @@ if sys.version_info < (3,):
extra_requirements.append('future==0.14.3') extra_requirements.append('future==0.14.3')


setup(name='alarmdecoder', setup(name='alarmdecoder',
version='1.13.2',
version='1.13.3',
description='Python interface for the AlarmDecoder (AD2) family ' description='Python interface for the AlarmDecoder (AD2) family '
'of alarm devices which includes the AD2USB, AD2SERIAL and AD2PI.', 'of alarm devices which includes the AD2USB, AD2SERIAL and AD2PI.',
long_description=readme(), long_description=readme(),


+ 23
- 16
test/test_ad2.py View File

@@ -20,6 +20,7 @@ class TestAlarmDecoder(TestCase):
self._panicked = False self._panicked = False
self._relay_changed = False self._relay_changed = False
self._power_changed = False self._power_changed = False
self._ready_changed = False
self._alarmed = False self._alarmed = False
self._bypassed = False self._bypassed = False
self._battery = False self._battery = False
@@ -42,10 +43,11 @@ class TestAlarmDecoder(TestCase):
self._device.on_read = EventHandler(Event(), self._device) self._device.on_read = EventHandler(Event(), self._device)
self._device.on_write = EventHandler(Event(), self._device) self._device.on_write = EventHandler(Event(), self._device)


self._decoder = AlarmDecoder(self._device)
self._decoder = AlarmDecoder(self._device, ignore_lrr_states=False)
self._decoder.on_panic += self.on_panic self._decoder.on_panic += self.on_panic
self._decoder.on_relay_changed += self.on_relay_changed self._decoder.on_relay_changed += self.on_relay_changed
self._decoder.on_power_changed += self.on_power_changed self._decoder.on_power_changed += self.on_power_changed
self._decoder.on_ready_changed += self.on_ready_changed
self._decoder.on_alarm += self.on_alarm self._decoder.on_alarm += self.on_alarm
self._decoder.on_alarm_restored += self.on_alarm_restored self._decoder.on_alarm_restored += self.on_alarm_restored
self._decoder.on_bypass += self.on_bypass self._decoder.on_bypass += self.on_bypass
@@ -79,6 +81,9 @@ class TestAlarmDecoder(TestCase):
def on_power_changed(self, sender, *args, **kwargs): def on_power_changed(self, sender, *args, **kwargs):
self._power_changed = kwargs['status'] self._power_changed = kwargs['status']


def on_ready_changed(self, sender, *args, **kwargs):
self._ready_changed = kwargs['status']

def on_alarm(self, sender, *args, **kwargs): def on_alarm(self, sender, *args, **kwargs):
self._alarmed = True self._alarmed = True


@@ -240,6 +245,17 @@ class TestAlarmDecoder(TestCase):
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._power_changed) self.assertTrue(self._power_changed)


def test_ready_changed_event(self):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertFalse(self._ready_changed) # Not set first time we hit it.

msg = self._decoder._handle_message(b'[1000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._ready_changed)

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertFalse(self._ready_changed)


def test_alarm_event(self): def test_alarm_event(self):
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertFalse(self._alarmed) # Not set first time we hit it. self.assertFalse(self._alarmed) # Not set first time we hit it.
@@ -288,32 +304,23 @@ class TestAlarmDecoder(TestCase):
self.assertFalse(self._battery) self.assertFalse(self._battery)


def test_fire_alarm_event(self): def test_fire_alarm_event(self):
self._fire = FireState.NONE
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertFalse(self._fire) # Not set the first time we hit it.


msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, FireState.ALARM)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._fire_status[1] + 35):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, FireState.NONE)
self.assertTrue(self._fire)


def test_fire_lrr(self): def test_fire_lrr(self):
self._fire = FireState.NONE
self._fire = False


msg = self._decoder._handle_message(b'!LRR:095,1,CID_1110,ff') # Fire: Non-specific msg = self._decoder._handle_message(b'!LRR:095,1,CID_1110,ff') # Fire: Non-specific


self.assertIsInstance(msg, LRRMessage) self.assertIsInstance(msg, LRRMessage)
self.assertEquals(self._fire, FireState.ALARM)
self.assertTrue(self._fire)


msg = self._decoder._handle_message(b'!LRR:001,1,CID_1406,ff') # Open/Close: Cancel msg = self._decoder._handle_message(b'!LRR:001,1,CID_1406,ff') # Open/Close: Cancel
self.assertIsInstance(msg, LRRMessage) self.assertIsInstance(msg, LRRMessage)
self.assertEquals(self._fire, FireState.ACKNOWLEDGED)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._fire_status[1] + 35):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, FireState.NONE)
self.assertFalse(self._fire)


def test_hit_for_faults(self): def test_hit_for_faults(self):
self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "') self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')


Loading…
Cancel
Save