"""
Provides the main AlarmDecoder class.
.. _AlarmDecoder: http://www.alarmdecoder.com
.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""
import time
import re
from .event import event
from .util import InvalidMessageError
from .messages import Message, ExpanderMessage, RFMessage, LRRMessage
from .zonetracking import Zonetracker
from .panels import PANEL_TYPES, ADEMCO, DSC
[docs]class AlarmDecoder(object):
"""
High-level wrapper around `AlarmDecoder`_ (AD2) devices.
"""
# High-level Events
on_arm = event.Event("This event is called when the panel is armed.\n\n**Callback definition:** *def callback(device)*")
on_disarm = event.Event("This event is called when the panel is disarmed.\n\n**Callback definition:** *def callback(device)*")
on_power_changed = event.Event("This event is called when panel power switches between AC and DC.\n\n**Callback definition:** *def callback(device, status)*")
on_alarm = event.Event("This event is called when the alarm is triggered.\n\n**Callback definition:** *def callback(device, zone)*")
on_alarm_restored = event.Event("This event is called when the alarm stops sounding.\n\n**Callback definition:** *def callback(device, zone)*")
on_fire = event.Event("This event is called when a fire is detected.\n\n**Callback definition:** *def callback(device, status)*")
on_bypass = event.Event("This event is called when a zone is bypassed. \n\n\n\n**Callback definition:** *def callback(device, status)*")
on_boot = event.Event("This event is called when the device finishes booting.\n\n**Callback definition:** *def callback(device)*")
on_config_received = event.Event("This event is called when the device receives its configuration. \n\n**Callback definition:** *def callback(device)*")
on_zone_fault = event.Event("This event is called when :py:class:`~alarmdecoder.zonetracking.Zonetracker` detects a zone fault.\n\n**Callback definition:** *def callback(device, zone)*")
on_zone_restore = event.Event("This event is called when :py:class:`~alarmdecoder.zonetracking.Zonetracker` detects that a fault is restored.\n\n**Callback definition:** *def callback(device, zone)*")
on_low_battery = event.Event("This event is called when the device detects a low battery.\n\n**Callback definition:** *def callback(device, status)*")
on_panic = event.Event("This event is called when the device detects a panic.\n\n**Callback definition:** *def callback(device, status)*")
on_relay_changed = event.Event("This event is called when a relay is opened or closed on an expander board.\n\n**Callback definition:** *def callback(device, message)*")
# Mid-level Events
on_message = event.Event("This event is called when standard panel :py:class:`~alarmdecoder.messages.Message` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_expander_message = event.Event("This event is called when an :py:class:`~alarmdecoder.messages.ExpanderMessage` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_lrr_message = event.Event("This event is called when an :py:class:`~alarmdecoder.messages.LRRMessage` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_rfx_message = event.Event("This event is called when an :py:class:`~alarmdecoder.messages.RFMessage` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_sending_received = event.Event("This event is called when a !Sending.done message is received from the AlarmDecoder.\n\n**Callback definition:** *def callback(device, status, message)*")
# Low-level Events
on_open = event.Event("This event is called when the device has been opened.\n\n**Callback definition:** *def callback(device)*")
on_close = event.Event("This event is called when the device has been closed.\n\n**Callback definition:** *def callback(device)*")
on_read = event.Event("This event is called when a line has been read from the device.\n\n**Callback definition:** *def callback(device, data)*")
on_write = event.Event("This event is called when data has been written to the device.\n\n**Callback definition:** *def callback(device, data)*")
# Constants
KEY_F1 = unichr(1) + unichr(1) + unichr(1)
"""Represents panel function key #1"""
KEY_F2 = unichr(2) + unichr(2) + unichr(2)
"""Represents panel function key #2"""
KEY_F3 = unichr(3) + unichr(3) + unichr(3)
"""Represents panel function key #3"""
KEY_F4 = unichr(4) + unichr(4) + unichr(4)
"""Represents panel function key #4"""
KEY_PANIC = unichr(5) + unichr(5) + unichr(5)
"""Represents a panic keypress"""
BATTERY_TIMEOUT = 30
"""Default timeout (in seconds) before the battery status reverts."""
FIRE_TIMEOUT = 30
"""Default tTimeout (in seconds) before the fire status reverts."""
# Attributes
address = 18
"""The keypad address in use by the device."""
configbits = 0xFF00
"""The configuration bits set on the device."""
address_mask = 0xFFFFFFFF
"""The address mask configured on the device."""
emulate_zone = [False for _ in range(5)]
"""List containing the devices zone emulation status."""
emulate_relay = [False for _ in range(4)]
"""List containing the devices relay emulation status."""
emulate_lrr = False
"""The status of the devices LRR emulation."""
deduplicate = False
"""The status of message deduplication as configured on the device."""
mode = ADEMCO
"""The panel mode that the AlarmDecoder is in. Currently supports ADEMCO and DSC."""
def __init__(self, device):
"""
Constructor
:param device: The low-level device used for this `AlarmDecoder`_
interface.
:type device: Device
"""
self._device = device
self._zonetracker = Zonetracker(self)
self._battery_timeout = AlarmDecoder.BATTERY_TIMEOUT
self._fire_timeout = AlarmDecoder.FIRE_TIMEOUT
self._power_status = None
self._alarm_status = None
self._bypass_status = None
self._armed_status = None
self._fire_status = (False, 0)
self._battery_status = (False, 0)
self._panic_status = None
self._relay_status = {}
self._internal_address_mask = 0xFFFFFFFF
self.address = 18
self.configbits = 0xFF00
self.address_mask = 0x00000000
self.emulate_zone = [False for x in range(5)]
self.emulate_relay = [False for x in range(4)]
self.emulate_lrr = False
self.deduplicate = False
self.mode = ADEMCO
def __enter__(self):
"""
Support for context manager __enter__.
"""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Support for context manager __exit__.
"""
self.close()
return False
@property
[docs] def id(self):
"""
The ID of the `AlarmDecoder`_ device.
:returns: identification string for the device
"""
return self._device.id
@property
def battery_timeout(self):
"""
Retrieves the timeout for restoring the battery status, in seconds.
:returns: battery status timeout
"""
return self._battery_timeout
@battery_timeout.setter
[docs] def battery_timeout(self, value):
"""
Sets the timeout for restoring the battery status, in seconds.
:param value: timeout in seconds
:type value: int
"""
self._battery_timeout = value
@property
def fire_timeout(self):
"""
Retrieves the timeout for restoring the fire status, in seconds.
:returns: fire status timeout
"""
return self._fire_timeout
@fire_timeout.setter
[docs] def fire_timeout(self, value):
"""
Sets the timeout for restoring the fire status, in seconds.
:param value: timeout in seconds
:type value: int
"""
self._fire_timeout = value
@property
def internal_address_mask(self):
"""
Retrieves the address mask used for updating internal status.
:returns: address mask
"""
return self._internal_address_mask
@internal_address_mask.setter
[docs] def internal_address_mask(self, value):
"""
Sets the address mask used internally for updating status.
:param value: address mask
:type value: int
"""
self._internal_address_mask = value
[docs] def open(self, baudrate=None, no_reader_thread=False):
"""
Opens the device.
:param baudrate: baudrate used for the device. Defaults to the lower-level device default.
:type baudrate: int
:param no_reader_thread: Specifies whether or not the automatic reader
thread should be started.
:type no_reader_thread: bool
"""
self._wire_events()
self._device.open(baudrate=baudrate, no_reader_thread=no_reader_thread)
return self
[docs] def close(self):
"""
Closes the device.
"""
if self._device:
self._device.close()
del self._device
self._device = None
[docs] def send(self, data):
"""
Sends data to the `AlarmDecoder`_ device.
:param data: data to send
:type data: string
"""
if self._device:
self._device.write(str(data))
[docs] def get_config(self):
"""
Retrieves the configuration from the device. Called automatically by :py:meth:`_on_open`.
"""
self.send("C\r")
[docs] def save_config(self):
"""
Sets configuration entries on the device.
"""
config_string = ''
config_entries = []
# HACK: This is ugly.. but I can't think of an elegant way of doing it.
config_entries.append(('ADDRESS', '{0}'.format(self.address)))
config_entries.append(('CONFIGBITS', '{0:x}'.format(self.configbits)))
config_entries.append(('MASK', '{0:x}'.format(self.address_mask)))
config_entries.append(('EXP',
''.join(['Y' if z else 'N' for z in self.emulate_zone])))
config_entries.append(('REL',
''.join(['Y' if r else 'N' for r in self.emulate_relay])))
config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N'))
config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N'))
config_entries.append(('MODE', PANEL_TYPES.keys()[PANEL_TYPES.values().index(self.mode)]))
config_string = '&'.join(['='.join(t) for t in config_entries])
self.send("C{0}\r".format(config_string))
[docs] def reboot(self):
"""
Reboots the device.
"""
self.send('=')
[docs] def fault_zone(self, zone, simulate_wire_problem=False):
"""
Faults a zone if we are emulating a zone expander.
:param zone: zone to fault
:type zone: int
:param simulate_wire_problem: Whether or not to simulate a wire fault
:type simulate_wire_problem: bool
"""
# Allow ourselves to also be passed an address/channel combination
# for zone expanders.
#
# Format (expander index, channel)
if isinstance(zone, tuple):
expander_idx, channel = zone
zone = self._zonetracker.expander_to_zone(expander_idx, channel)
status = 2 if simulate_wire_problem else 1
self.send("L{0:02}{1}\r".format(zone, status))
[docs] def clear_zone(self, zone):
"""
Clears a zone if we are emulating a zone expander.
:param zone: zone to clear
:type zone: int
"""
self.send("L{0:02}0\r".format(zone))
def _wire_events(self):
"""
Wires up the internal device events.
"""
self._device.on_open += self._on_open
self._device.on_close += self._on_close
self._device.on_read += self._on_read
self._device.on_write += self._on_write
self._zonetracker.on_fault += self._on_zone_fault
self._zonetracker.on_restore += self._on_zone_restore
def _handle_message(self, data):
"""
Parses keypad messages from the panel.
:param data: keypad data to parse
:type data: string
:returns: :py:class:`~alarmdecoder.messages.Message`
"""
if data is not None:
data = data.lstrip('\0')
if data is None or data == '':
raise InvalidMessageError()
msg = None
header = data[0:4]
if header[0] != '!' or header == '!KPM':
msg = self._handle_keypad_message(data)
elif header == '!EXP' or header == '!REL':
msg = self._handle_expander_message(data)
elif header == '!RFX':
msg = self._handle_rfx(data)
elif header == '!LRR':
msg = self._handle_lrr(data)
elif data.startswith('!Ready'):
self.on_boot()
elif data.startswith('!CONFIG'):
self._handle_config(data)
elif data.startswith('!Sending'):
self._handle_sending(data)
return msg
def _handle_keypad_message(self, data):
"""
Handle keypad messages.
:param data: keypad message to parse
:type data: string
:returns: :py:class:`~alarmdecoder.messages.Message`
"""
msg = Message(data)
if self._internal_address_mask & msg.mask > 0:
self._update_internal_states(msg)
self.on_message(message=msg)
return msg
def _handle_expander_message(self, data):
"""
Handle expander messages.
:param data: expander message to parse
:type data: string
:returns: :py:class:`~alarmdecoder.messages.ExpanderMessage`
"""
msg = ExpanderMessage(data)
self._update_internal_states(msg)
self.on_expander_message(message=msg)
return msg
def _handle_rfx(self, data):
"""
Handle RF messages.
:param data: RF message to parse
:type data: string
:returns: :py:class:`~alarmdecoder.messages.RFMessage`
"""
msg = RFMessage(data)
self.on_rfx_message(message=msg)
return msg
def _handle_lrr(self, data):
"""
Handle Long Range Radio messages.
:param data: LRR message to parse
:type data: string
:returns: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
msg = LRRMessage(data)
if msg.event_type == 'ALARM_PANIC':
self._panic_status = True
self.on_panic(status=True)
elif msg.event_type == 'CANCEL':
if self._panic_status is True:
self._panic_status = False
self.on_panic(status=False)
self.on_lrr_message(message=msg)
return msg
def _handle_config(self, data):
"""
Handles received configuration data.
:param data: Configuration string to parse
:type data: string
"""
_, config_string = data.split('>')
for setting in config_string.split('&'):
key, val = setting.split('=')
if key == 'ADDRESS':
self.address = int(val)
elif key == 'CONFIGBITS':
self.configbits = int(val, 16)
elif key == 'MASK':
self.address_mask = int(val, 16)
elif key == 'EXP':
self.emulate_zone = [val[z] == 'Y' for z in range(5)]
elif key == 'REL':
self.emulate_relay = [val[r] == 'Y' for r in range(4)]
elif key == 'LRR':
self.emulate_lrr = (val == 'Y')
elif key == 'DEDUPLICATE':
self.deduplicate = (val == 'Y')
elif key == 'MODE':
self.mode = PANEL_TYPES[val]
self.on_config_received()
def _handle_sending(self, data):
"""
Handles results of a keypress send.
:param data: Sending string to parse
:type data: string
"""
matches = re.match('^!Sending(\.{1,5})done.*', data)
if matches is not None:
good_send = False
if len(matches.group(1)) < 5:
good_send = True
self.on_sending_received(status=good_send, message=data)
def _update_internal_states(self, message):
"""
Updates internal device states.
:param message: :py:class:`~alarmdecoder.messages.Message` to update internal states with
:type message: :py:class:`~alarmdecoder.messages.Message`, :py:class:`~alarmdecoder.messages.ExpanderMessage`, :py:class:`~alarmdecoder.messages.LRRMessage`, or :py:class:`~alarmdecoder.messages.RFMessage`
"""
if isinstance(message, Message):
self._update_power_status(message)
self._update_alarm_status(message)
self._update_zone_bypass_status(message)
self._update_armed_status(message)
self._update_battery_status(message)
self._update_fire_status(message)
elif isinstance(message, ExpanderMessage):
self._update_expander_status(message)
self._update_zone_tracker(message)
def _update_power_status(self, message):
"""
Uses the provided message to update the AC power state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: bool indicating the new status
"""
if message.ac_power != self._power_status:
self._power_status, old_status = message.ac_power, self._power_status
if old_status is not None:
self.on_power_changed(status=self._power_status)
return self._power_status
def _update_alarm_status(self, message):
"""
Uses the provided message to update the alarm state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: bool indicating the new status
"""
if message.alarm_sounding != self._alarm_status:
self._alarm_status, old_status = message.alarm_sounding, self._alarm_status
if old_status is not None:
if self._alarm_status:
self.on_alarm(zone=message.numeric_code)
else:
self.on_alarm_restored(zone=message.numeric_code)
return self._alarm_status
def _update_zone_bypass_status(self, message):
"""
Uses the provided message to update the zone bypass state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: bool indicating the new status
"""
if message.zone_bypassed != self._bypass_status:
self._bypass_status, old_status = message.zone_bypassed, self._bypass_status
if old_status is not None:
self.on_bypass(status=self._bypass_status)
return self._bypass_status
def _update_armed_status(self, message):
"""
Uses the provided message to update the armed state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: bool indicating the new status
"""
message_status = message.armed_away | message.armed_home
if message_status != self._armed_status:
self._armed_status, old_status = message_status, self._armed_status
if old_status is not None:
if self._armed_status:
self.on_arm()
else:
self.on_disarm()
return self._armed_status
def _update_battery_status(self, message):
"""
Uses the provided message to update the battery state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: boolean indicating the new status
"""
last_status, last_update = self._battery_status
if message.battery_low == last_status:
self._battery_status = (last_status, time.time())
else:
if message.battery_low is True or time.time() > last_update + self._battery_timeout:
self._battery_status = (message.battery_low, time.time())
self.on_low_battery(status=message.battery_low)
return self._battery_status[0]
def _update_fire_status(self, message):
"""
Uses the provided message to update the fire alarm state.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:returns: boolean indicating the new status
"""
last_status, last_update = self._fire_status
if message.fire_alarm == last_status:
self._fire_status = (last_status, time.time())
else:
if message.fire_alarm is True or time.time() > last_update + self._fire_timeout:
self._fire_status = (message.fire_alarm, time.time())
self.on_fire(status=message.fire_alarm)
return self._fire_status[0]
def _update_expander_status(self, message):
"""
Uses the provided message to update the expander states.
:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.ExpanderMessage`
:returns: boolean indicating the new status
"""
if message.type == ExpanderMessage.RELAY:
self._relay_status[(message.address, message.channel)] = message.value
self.on_relay_changed(message=message)
return self._relay_status[(message.address, message.channel)]
def _update_zone_tracker(self, message):
"""
Trigger an update of the :py:class:`~alarmdecoder.messages.Zonetracker`.
:param message: message to update the zonetracker with
:type message: :py:class:`~alarmdecoder.messages.Message`, :py:class:`~alarmdecoder.messages.ExpanderMessage`, :py:class:`~alarmdecoder.messages.LRRMessage`, or :py:class:`~alarmdecoder.messages.RFMessage`
"""
# Retrieve a list of faults.
# NOTE: This only happens on first boot or after exiting programming mode.
if isinstance(message, Message):
if not message.ready and "Hit * for faults" in message.text:
self.send('*')
return
self._zonetracker.update(message)
def _on_open(self, sender, *args, **kwargs):
"""
Internal handler for opening the device.
"""
self.get_config()
self.on_open()
def _on_close(self, sender, *args, **kwargs):
"""
Internal handler for closing the device.
"""
self.on_close()
def _on_read(self, sender, *args, **kwargs):
"""
Internal handler for reading from the device.
"""
data = kwargs.get('data', None)
self.on_read(data=data)
self._handle_message(data)
def _on_write(self, sender, *args, **kwargs):
"""
Internal handler for writing to the device.
"""
self.on_write(data=kwargs.get('data', None))
def _on_zone_fault(self, sender, *args, **kwargs):
"""
Internal handler for zone faults.
"""
self.on_zone_fault(*args, **kwargs)
def _on_zone_restore(self, sender, *args, **kwargs):
"""
Internal handler for zone restoration.
"""
self.on_zone_restore(*args, **kwargs)