Browse Source

Refactored _update_internal_states. Made battery/fire timeouts configurable.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
ba91f5e4eb
2 changed files with 179 additions and 44 deletions
  1. +171
    -36
      alarmdecoder/decoder.py
  2. +8
    -8
      alarmdecoder/tests/test_ad2.py

+ 171
- 36
alarmdecoder/decoder.py View File

@@ -69,6 +69,8 @@ class AlarmDecoder(object):
self._device = device
self._zonetracker = Zonetracker()

self._battery_timeout = AlarmDecoder.BATTERY_TIMEOUT
self._fire_timeout = AlarmDecoder.FIRE_TIMEOUT
self._power_status = None
self._alarm_status = None
self._bypass_status = None
@@ -109,6 +111,44 @@ class AlarmDecoder(object):
"""
return self._device.id

@property
def battery_timeout(self):
"""
Retrieves the timeout for restoring the battery status, in seconds.

:returns: The battery status timeout
"""
return self._battery_timeout

@battery_timeout.setter
def battery_timeout(self, value):
"""
Sets the timeout for restoring the battery status, in seconds.

:param value: The timeout in seconds.
:type value: int
"""
self._battery_timeout = value

@property
def fire_timeout(self):
"""
Retrieves the timeout for restoring the fire status, in seconds.

:returns: The fire status timeout
"""
return self._fire_timeout

@fire_timeout.setter
def fire_timeout(self, value):
"""
Sets the timeout for restoring the fire status, in seconds.

:param value: The timeout in seconds.
:type value: int
"""
self._fire_timeout = value

def open(self, baudrate=None, no_reader_thread=False):
"""
Opens the device.
@@ -341,54 +381,149 @@ class AlarmDecoder(object):
:type message: Message, ExpanderMessage, LRRMessage, or RFMessage
"""
if isinstance(message, Message):
if message.ac_power != self._power_status:
self._power_status, old_status = message.ac_power, self._power_status
self._update_power_status(message)
self._update_alarm_status(message)
self._update_zone_bypass_status(message)
self._update_armed_status(message)
self._update_battery_status(message)
self._update_fire_status(message)

if old_status is not None:
self.on_power_changed(status=self._power_status)
elif isinstance(message, ExpanderMessage):
self._update_expander_status(message)

if message.alarm_sounding != self._alarm_status:
self._alarm_status, old_status = message.alarm_sounding, self._alarm_status
self._update_zone_tracker(message)

if old_status is not None:
self.on_alarm(status=self._alarm_status)
def _update_power_status(self, message):
"""
Uses the provided message to update the AC power state.

if message.zone_bypassed != self._bypass_status:
self._bypass_status, old_status = message.zone_bypassed, self._bypass_status
:param message: The message to use to update.
:type message: Message

if old_status is not None:
self.on_bypass(status=self._bypass_status)
:returns: Boolean indicating the new status
"""
if message.ac_power != self._power_status:
self._power_status, old_status = message.ac_power, self._power_status

if (message.armed_away | message.armed_home) != self._armed_status:
self._armed_status, old_status = message.armed_away | message.armed_home, self._armed_status
if old_status is not None:
self.on_power_changed(status=self._power_status)

if old_status is not None:
if self._armed_status:
self.on_arm()
else:
self.on_disarm()
return self._power_status

if message.battery_low == self._battery_status[0]:
self._battery_status = (self._battery_status[0], time.time())
else:
if message.battery_low is True or time.time() > self._battery_status[1] + AlarmDecoder.BATTERY_TIMEOUT:
self._battery_status = (message.battery_low, time.time())
self.on_low_battery(status=self._battery_status)
def _update_alarm_status(self, message):
"""
Uses the provided message to update the alarm state.

if message.fire_alarm == self._fire_status[0]:
self._fire_status = (self._fire_status[0], time.time())
else:
if message.fire_alarm is True or time.time() > self._fire_status[1] + AlarmDecoder.FIRE_TIMEOUT:
self._fire_status = (message.fire_alarm, time.time())
self.on_fire(status=self._fire_status)
:param message: The message to use to update.
:type message: Message

elif isinstance(message, ExpanderMessage):
if message.type == ExpanderMessage.RELAY:
self._relay_status[(message.address, message.channel)] = message.value
:returns: Boolean indicating the new status
"""

self.on_relay_changed(message=message)
if message.alarm_sounding != self._alarm_status:
self._alarm_status, old_status = message.alarm_sounding, self._alarm_status

self._update_zone_tracker(message)
if old_status is not None:
self.on_alarm(status=self._alarm_status)

return self._alarm_status

def _update_zone_bypass_status(self, message):
"""
Uses the provided message to update the zone bypass state.

:param message: The message to use to update.
:type message: Message

:returns: Boolean indicating the new status
"""

if message.zone_bypassed != self._bypass_status:
self._bypass_status, old_status = message.zone_bypassed, self._bypass_status

if old_status is not None:
self.on_bypass(status=self._bypass_status)

return self._bypass_status

def _update_armed_status(self, message):
"""
Uses the provided message to update the armed state.

:param message: The message to use to update.
:type message: Message

:returns: Boolean indicating the new status
"""

message_status = message.armed_away | message.armed_home
if message_status != self._armed_status:
self._armed_status, old_status = message_status, self._armed_status

if old_status is not None:
if self._armed_status:
self.on_arm()
else:
self.on_disarm()

return self._armed_status

def _update_battery_status(self, message):
"""
Uses the provided message to update the battery state.

:param message: The message to use to update.
:type message: Message

:returns: Boolean indicating the new status
"""

last_status, last_update = self._battery_status
if message.battery_low == last_status:
self._battery_status = (last_status, time.time())
else:
if message.battery_low is True or time.time() > last_update + self._battery_timeout:
self._battery_status = (message.battery_low, time.time())
self.on_low_battery(status=message.battery_low)

return self._battery_status[0]

def _update_fire_status(self, message):
"""
Uses the provided message to update the fire alarm state.

:param message: The message to use to update.
:type message: Message

:returns: Boolean indicating the new status
"""

last_status, last_update = self._fire_status
if message.fire_alarm == last_status:
self._fire_status = (last_status, time.time())
else:
if message.fire_alarm is True or time.time() > last_update + self._fire_timeout:
self._fire_status = (message.fire_alarm, time.time())
self.on_fire(status=message.fire_alarm)

return self._fire_status[0]

def _update_expander_status(self, message):
"""
Uses the provided message to update the expander states.

:param message: The message to use to update.
:type message: ExpanderMessage

:returns: Boolean indicating the new status
"""

if message.type == ExpanderMessage.RELAY:
self._relay_status[(message.address, message.channel)] = message.value

self.on_relay_changed(message=message)

return self._relay_status[(message.address, message.channel)]

def _update_zone_tracker(self, message):
"""


+ 8
- 8
alarmdecoder/tests/test_ad2.py View File

@@ -17,8 +17,8 @@ class TestAlarmDecoder(TestCase):
self._power_changed = False
self._alarmed = False
self._bypassed = False
self._battery = (False, 0)
self._fire = (False, 0)
self._battery = False
self._fire = False
self._armed = False
self._got_config = False
self._message_received = False
@@ -240,21 +240,21 @@ class TestAlarmDecoder(TestCase):

def test_battery_low_event(self):
msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery[0], True)
self.assertEquals(self._battery, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._battery[1] + 35):
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery[0], False)
self.assertEquals(self._battery, False)

def test_fire_alarm_event(self):
msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire[0], True)
self.assertEquals(self._fire, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._fire[1] + 35):
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire[0], False)
self.assertEquals(self._fire, False)

def test_hit_for_faults(self):
self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')


Loading…
Cancel
Save