Browse Source

Merge branch 'new-lrr-format' into dev

pyserial_fix
Scott Petersen 7 years ago
parent
commit
d0229e15ea
26 changed files with 3262 additions and 1800 deletions
  1. +1
    -0
      .gitignore
  2. +198
    -48
      alarmdecoder/decoder.py
  3. +0
    -1282
      alarmdecoder/devices.py
  4. +6
    -0
      alarmdecoder/devices/__init__.py
  5. +147
    -0
      alarmdecoder/devices/base_device.py
  6. +278
    -0
      alarmdecoder/devices/serial_device.py
  7. +399
    -0
      alarmdecoder/devices/socket_device.py
  8. +490
    -0
      alarmdecoder/devices/usb_device.py
  9. +0
    -410
      alarmdecoder/messages.py
  10. +9
    -0
      alarmdecoder/messages/__init__.py
  11. +47
    -0
      alarmdecoder/messages/aui_message.py
  12. +46
    -0
      alarmdecoder/messages/base_message.py
  13. +83
    -0
      alarmdecoder/messages/expander_message.py
  14. +9
    -0
      alarmdecoder/messages/lrr/__init__.py
  15. +819
    -0
      alarmdecoder/messages/lrr/events.py
  16. +113
    -0
      alarmdecoder/messages/lrr/message.py
  17. +164
    -0
      alarmdecoder/messages/lrr/system.py
  18. +190
    -0
      alarmdecoder/messages/panel_message.py
  19. +82
    -0
      alarmdecoder/messages/rf_message.py
  20. +7
    -0
      alarmdecoder/states.py
  21. +39
    -0
      alarmdecoder/util.py
  22. +2
    -8
      alarmdecoder/zonetracking.py
  23. +69
    -41
      test/test_ad2.py
  24. +14
    -8
      test/test_devices.py
  25. +47
    -3
      test/test_messages.py
  26. +3
    -0
      test/test_zonetracking.py

+ 1
- 0
.gitignore View File

@@ -8,3 +8,4 @@ tmp
*.egg-info
bin/ad2-test
*~
.vscode

+ 198
- 48
alarmdecoder/decoder.py View File

@@ -17,9 +17,11 @@ except ImportError:

from .event import event
from .util import InvalidMessageError
from .messages import Message, ExpanderMessage, RFMessage, LRRMessage
from .messages import Message, ExpanderMessage, RFMessage, LRRMessage, AUIMessage
from .messages.lrr import LRRSystem
from .zonetracking import Zonetracker
from .panels import PANEL_TYPES, ADEMCO, DSC
from .states import FireState


class AlarmDecoder(object):
@@ -49,6 +51,7 @@ class AlarmDecoder(object):
on_lrr_message = event.Event("This event is called when an :py:class:`~alarmdecoder.messages.LRRMessage` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_rfx_message = event.Event("This event is called when an :py:class:`~alarmdecoder.messages.RFMessage` is received.\n\n**Callback definition:** *def callback(device, message)*")
on_sending_received = event.Event("This event is called when a !Sending.done message is received from the AlarmDecoder.\n\n**Callback definition:** *def callback(device, status, message)*")
on_aui_message = event.Event("This event is called when an :py:class`~alarmdecoder.messages.AUIMessage` is received\n\n**Callback definition:** *def callback(device, message)*")

# Low-level Events
on_open = event.Event("This event is called when the device has been opened.\n\n**Callback definition:** *def callback(device)*")
@@ -90,6 +93,8 @@ class AlarmDecoder(object):
"""The status of message deduplication as configured on the device."""
mode = ADEMCO
"""The panel mode that the AlarmDecoder is in. Currently supports ADEMCO and DSC."""
emulate_com = False
"""The status of the devices COM emulation."""

#Version Information
serial_number = 0xFFFFFFFF
@@ -99,25 +104,32 @@ class AlarmDecoder(object):
version_flags = ""
"""Device flags enabled"""

def __init__(self, device):
def __init__(self, device, ignore_message_states=False):
"""
Constructor

:param device: The low-level device used for this `AlarmDecoder`_
interface.
:type device: Device
:param ignore_message_states: Ignore regular panel messages when updating internal states
:type ignore_message_states: bool
"""
self._device = device
self._zonetracker = Zonetracker(self)
self._lrr_system = LRRSystem(self)

self._ignore_message_states = ignore_message_states
self._battery_timeout = AlarmDecoder.BATTERY_TIMEOUT
self._fire_timeout = AlarmDecoder.FIRE_TIMEOUT
self._power_status = None
self._alarm_status = None
self._bypass_status = None
self._bypass_status = {}
self._armed_status = None
self._armed_stay = False
self._fire_status = (False, 0)
self._fire_alarming = False
self._fire_alarming_changed = 0
self._fire_state = FireState.NONE
self._battery_status = (False, 0)
self._panic_status = False
self._relay_status = {}
@@ -134,6 +146,7 @@ class AlarmDecoder(object):
self.emulate_lrr = False
self.deduplicate = False
self.mode = ADEMCO
self.emulate_com = False

self.serial_number = 0xFFFFFFFF
self.version_number = 'Unknown'
@@ -276,6 +289,12 @@ class AlarmDecoder(object):
self.send("C{0}\r".format(self.get_config_string()))

def get_config_string(self):
"""
Build a configuration string that's compatible with the AlarmDecoder configuration
command from the current values in the object.

:returns: string
"""
config_entries = []

# HACK: This is ugly.. but I can't think of an elegant way of doing it.
@@ -289,6 +308,7 @@ class AlarmDecoder(object):
config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N'))
config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N'))
config_entries.append(('MODE', list(PANEL_TYPES)[list(PANEL_TYPES.values()).index(self.mode)]))
config_entries.append(('COM', 'Y' if self.emulate_com else 'N'))

config_string = '&'.join(['='.join(t) for t in config_entries])

@@ -382,6 +402,9 @@ class AlarmDecoder(object):
elif header == '!LRR':
msg = self._handle_lrr(data)

elif header == '!AUI':
msg = self._handle_aui(data)

elif data.startswith('!Ready'):
self.on_boot()

@@ -405,10 +428,14 @@ class AlarmDecoder(object):

:returns: :py:class:`~alarmdecoder.messages.Message`
"""

msg = Message(data)

if self._internal_address_mask & msg.mask > 0:
self._update_internal_states(msg)
if not self._ignore_message_states:
self._update_internal_states(msg)
else:
self._update_fire_status(status=None)

self.on_message(message=msg)

@@ -456,16 +483,23 @@ class AlarmDecoder(object):
"""
msg = LRRMessage(data)

if msg.event_type == 'ALARM_PANIC':
self._panic_status = True
self.on_panic(status=True)
self._lrr_system.update(msg)
self.on_lrr_message(message=msg)

return msg

elif msg.event_type == 'CANCEL':
if self._panic_status is True:
self._panic_status = False
self.on_panic(status=False)
def _handle_aui(self, data):
"""
Handle AUI messages.

self.on_lrr_message(message=msg)
:param data: RF message to parse
:type data: string

:returns: :py:class`~alarmdecoder.messages.AUIMessage`
"""
msg = AUIMessage(data)

self.on_aui_message(message=msg)

return msg

@@ -511,6 +545,8 @@ class AlarmDecoder(object):
self.deduplicate = (val == 'Y')
elif key == 'MODE':
self.mode = PANEL_TYPES[val]
elif key == 'COM':
self.emulate_com = (val == 'Y')

self.on_config_received()

@@ -537,7 +573,7 @@ class AlarmDecoder(object):
:param message: :py:class:`~alarmdecoder.messages.Message` to update internal states with
:type message: :py:class:`~alarmdecoder.messages.Message`, :py:class:`~alarmdecoder.messages.ExpanderMessage`, :py:class:`~alarmdecoder.messages.LRRMessage`, or :py:class:`~alarmdecoder.messages.RFMessage`
"""
if isinstance(message, Message):
if isinstance(message, Message) and not self._ignore_message_states:
self._update_power_status(message)
self._update_alarm_status(message)
self._update_zone_bypass_status(message)
@@ -550,122 +586,237 @@ class AlarmDecoder(object):

self._update_zone_tracker(message)

def _update_power_status(self, message):
def _update_power_status(self, message=None, status=None):
"""
Uses the provided message to update the AC power state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: power status, overrides message bits.
:type status: bool

:returns: bool indicating the new status
"""
if message.ac_power != self._power_status:
self._power_status, old_status = message.ac_power, self._power_status
power_status = status
if isinstance(message, Message):
power_status = message.ac_power

if power_status is None:
return

if power_status != self._power_status:
self._power_status, old_status = power_status, self._power_status

if old_status is not None:
self.on_power_changed(status=self._power_status)

return self._power_status

def _update_alarm_status(self, message):
def _update_alarm_status(self, message=None, status=None, zone=None, user=None):
"""
Uses the provided message to update the alarm state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: alarm status, overrides message bits.
:type status: bool
:param user: user associated with alarm event
:type user: string

:returns: bool indicating the new status
"""

if message.alarm_sounding != self._alarm_status:
self._alarm_status, old_status = message.alarm_sounding, self._alarm_status
alarm_status = status
alarm_zone = zone
if isinstance(message, Message):
alarm_status = message.alarm_sounding
alarm_zone = message.parse_numeric_code()

if old_status is not None:
if alarm_status != self._alarm_status:
self._alarm_status, old_status = alarm_status, self._alarm_status

if old_status is not None or status is not None:
if self._alarm_status:
self.on_alarm(zone=message.numeric_code)
self.on_alarm(zone=alarm_zone)
else:
self.on_alarm_restored(zone=message.numeric_code)
self.on_alarm_restored(zone=alarm_zone, user=user)

return self._alarm_status

def _update_zone_bypass_status(self, message):
def _update_zone_bypass_status(self, message=None, status=None, zone=None):
"""
Uses the provided message to update the zone bypass state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: bypass status, overrides message bits.
:type status: bool
:param zone: zone associated with bypass event
:type zone: int

:returns: bool indicating the new status
"""
bypass_status = status
if isinstance(message, Message):
bypass_status = message.zone_bypassed

if message.zone_bypassed != self._bypass_status:
self._bypass_status, old_status = message.zone_bypassed, self._bypass_status
if bypass_status is None:
return

if old_status is not None:
self.on_bypass(status=self._bypass_status)
old_bypass_status = self._bypass_status.get(zone, None)

if bypass_status != old_bypass_status:
if bypass_status == False and zone is None:
self._bypass_status = {}
else:
self._bypass_status[zone] = bypass_status

if old_bypass_status is not None or message is None or (old_bypass_status is None and bypass_status is True):
self.on_bypass(status=bypass_status, zone=zone)

return self._bypass_status
return bypass_status

def _update_armed_status(self, message):
def _update_armed_status(self, message=None, status=None, status_stay=None):
"""
Uses the provided message to update the armed state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: armed status, overrides message bits
:type status: bool
:param status_stay: armed stay status, overrides message bits
:type status_stay: bool

:returns: bool indicating the new status
"""
arm_status = status
stay_status = status_stay

self._armed_status, old_status = message.armed_away, self._armed_status
self._armed_stay, old_stay = message.armed_home, self._armed_stay
if message.armed_away != old_status or message.armed_home != old_stay:
if old_status is not None:
if isinstance(message, Message):
arm_status = message.armed_away
stay_status = message.armed_home

if arm_status is None or stay_status is None:
return

self._armed_status, old_status = arm_status, self._armed_status
self._armed_stay, old_stay = stay_status, self._armed_stay
if arm_status != old_status or stay_status != old_stay:
if old_status is not None or message is None:
if self._armed_status or self._armed_stay:
self.on_arm(stay=message.armed_home)
self.on_arm(stay=stay_status)
else:
self.on_disarm()

return self._armed_status or self._armed_stay

def _update_battery_status(self, message):
def _update_battery_status(self, message=None, status=None):
"""
Uses the provided message to update the battery state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: battery status, overrides message bits
:type status: bool

:returns: boolean indicating the new status
"""
battery_status = status
if isinstance(message, Message):
battery_status = message.battery_low

if battery_status is None:
return

last_status, last_update = self._battery_status
if message.battery_low == last_status:
if battery_status == last_status:
self._battery_status = (last_status, time.time())
else:
if message.battery_low is True or time.time() > last_update + self._battery_timeout:
self._battery_status = (message.battery_low, time.time())
self.on_low_battery(status=message.battery_low)
if battery_status is True or time.time() > last_update + self._battery_timeout:
self._battery_status = (battery_status, time.time())
self.on_low_battery(status=battery_status)

return self._battery_status[0]

def _update_fire_status(self, message):
def _update_fire_status(self, message=None, status=None):
"""
Uses the provided message to update the fire alarm state.

:param message: message to use to update
:type message: :py:class:`~alarmdecoder.messages.Message`
:param status: fire status, overrides message bits
:type status: bool

:returns: boolean indicating the new status
"""
is_lrr = status is not None
fire_status = status
if isinstance(message, Message):
fire_status = message.fire_alarm

last_status, last_update = self._fire_status
if message.fire_alarm == last_status:
self._fire_status = (last_status, time.time())
else:
if message.fire_alarm is True or time.time() > last_update + self._fire_timeout:
self._fire_status = (message.fire_alarm, time.time())
self.on_fire(status=message.fire_alarm)

return self._fire_status[0]
if self._fire_state == FireState.NONE:
# Always move to a FIRE state if detected
if fire_status == True:
self._fire_state = FireState.ALARM
self._fire_status = (fire_status, time.time())

self.on_fire(status=FireState.ALARM)

elif self._fire_state == FireState.ALARM:
# If we've received an LRR CANCEL message, move to ACKNOWLEDGED
if is_lrr and fire_status == False:
self._fire_state = FireState.ACKNOWLEDGED
self._fire_status = (fire_status, time.time())
self.on_fire(status=FireState.ACKNOWLEDGED)
else:
# Handle bouncing status changes and timeout in order to revert back to NONE.
if last_status != fire_status or fire_status == True:
self._fire_status = (fire_status, time.time())
if fire_status == False and time.time() > last_update + self._fire_timeout:
self._fire_state = FireState.NONE
self.on_fire(status=FireState.NONE)

elif self._fire_state == FireState.ACKNOWLEDGED:
# If we've received a second LRR FIRE message after a CANCEL, revert back to FIRE and trigger another event.
if is_lrr and fire_status == True:
self._fire_state = FireState.ALARM
self._fire_status = (fire_status, time.time())

self.on_fire(status=FireState.ALARM)
else:
# Handle bouncing status changes and timeout in order to revert back to NONE.
if last_status != fire_status or fire_status == True:
self._fire_status = (fire_status, time.time())

if fire_status != True and time.time() > last_update + self._fire_timeout:
self._fire_state = FireState.NONE
self.on_fire(status=FireState.NONE)

return self._fire_state == FireState.ALARM


def _update_panic_status(self, status=None):
"""
Updates the panic status of the alarm panel.

:param status: status to use to update
:type status: boolean

:returns: boolean indicating the new status
"""
if status is None:
return

if status != self._panic_status:
self._panic_status, old_status = status, self._panic_status

if old_status is not None:
self.on_panic(status=self._panic_status)

return self._panic_status

def _update_expander_status(self, message):
"""
@@ -708,7 +859,6 @@ class AlarmDecoder(object):
Internal handler for opening the device.
"""
self.get_config()

self.get_version()

self.on_open()


+ 0
- 1282
alarmdecoder/devices.py
File diff suppressed because it is too large
View File


+ 6
- 0
alarmdecoder/devices/__init__.py View File

@@ -0,0 +1,6 @@
from .base_device import Device
from .serial_device import SerialDevice
from .socket_device import SocketDevice
from .usb_device import USBDevice

__all__ = ['Device', 'SerialDevice', 'SocketDevice', 'USBDevice']

+ 147
- 0
alarmdecoder/devices/base_device.py View File

@@ -0,0 +1,147 @@
"""
This module contains the base device type for the `AlarmDecoder`_ (AD2) family.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import threading

from ..util import CommError, TimeoutError, InvalidMessageError
from ..event import event


class Device(object):
"""
Base class for all `AlarmDecoder`_ (AD2) device types.
"""

# Generic device events
on_open = event.Event("This event is called when the device has been opened.\n\n**Callback definition:** *def callback(device)*")
on_close = event.Event("This event is called when the device has been closed.\n\n**Callback definition:** def callback(device)*")
on_read = event.Event("This event is called when a line has been read from the device.\n\n**Callback definition:** def callback(device, data)*")
on_write = event.Event("This event is called when data has been written to the device.\n\n**Callback definition:** def callback(device, data)*")

def __init__(self):
"""
Constructor
"""
self._id = ''
self._buffer = b''
self._device = None
self._running = False
self._read_thread = None

def __enter__(self):
"""
Support for context manager __enter__.
"""
return self

def __exit__(self, exc_type, exc_value, traceback):
"""
Support for context manager __exit__.
"""
self.close()

return False

@property
def id(self):
"""
Retrieve the device ID.

:returns: identification string for the device
"""
return self._id

@id.setter
def id(self, value):
"""
Sets the device ID.

:param value: device identification string
:type value: string
"""
self._id = value

def is_reader_alive(self):
"""
Indicates whether or not the reader thread is alive.

:returns: whether or not the reader thread is alive
"""
return self._read_thread.is_alive()

def stop_reader(self):
"""
Stops the reader thread.
"""
self._read_thread.stop()

def close(self):
"""
Closes the device.
"""
try:
self._running = False
self._read_thread.stop()
self._device.close()

except Exception:
pass

self.on_close()

class ReadThread(threading.Thread):
"""
Reader thread which processes messages from the device.
"""

READ_TIMEOUT = 10
"""Timeout for the reader thread."""

def __init__(self, device):
"""
Constructor

:param device: device used by the reader thread
:type device: :py:class:`~alarmdecoder.devices.Device`
"""
threading.Thread.__init__(self)
self._device = device
self._running = False

def stop(self):
"""
Stops the running thread.
"""
self._running = False

def run(self):
"""
The actual read process.
"""
self._running = True

while self._running:
try:
self._device.read_line(timeout=self.READ_TIMEOUT)

except TimeoutError:
pass

except InvalidMessageError:
pass

except SSL.WantReadError:
pass

except CommError as err:
self._device.close()

except Exception as err:
self._device.close()
self._running = False
raise

+ 278
- 0
alarmdecoder/devices/serial_device.py View File

@@ -0,0 +1,278 @@
"""
This module contains the :py:class:`SerialDevice` interface for the `AD2USB`_, `AD2SERIAL`_ or `AD2PI`_.

.. _AD2USB: http://www.alarmdecoder.com
.. _AD2SERIAL: http://www.alarmdecoder.com
.. _AD2PI: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import threading
import serial
import serial.tools.list_ports
import select
import sys
from .base_device import Device
from ..util import CommError, TimeoutError, NoDeviceError, bytes_hack


class SerialDevice(Device):
"""
`AD2USB`_, `AD2SERIAL`_ or `AD2PI`_ device utilizing the PySerial interface.
"""

# Constants
BAUDRATE = 19200
"""Default baudrate for Serial devices."""

@staticmethod
def find_all(pattern=None):
"""
Returns all serial ports present.

:param pattern: pattern to search for when retrieving serial ports
:type pattern: string

:returns: list of devices
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
devices = []

try:
if pattern:
devices = serial.tools.list_ports.grep(pattern)
else:
devices = serial.tools.list_ports.comports()

except serial.SerialException as err:
raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err)

return devices

@property
def interface(self):
"""
Retrieves the interface used to connect to the device.

:returns: interface used to connect to the device
"""
return self._port

@interface.setter
def interface(self, value):
"""
Sets the interface used to connect to the device.

:param value: name of the serial device
:type value: string
"""
self._port = value

def __init__(self, interface=None):
"""
Constructor

:param interface: device to open
:type interface: string
"""
Device.__init__(self)

self._port = interface
self._id = interface
# Timeout = non-blocking to match pyftdi.
self._device = serial.Serial(timeout=0, writeTimeout=0)

def open(self, baudrate=BAUDRATE, no_reader_thread=False):
"""
Opens the device.

:param baudrate: baudrate to use with the device
:type baudrate: int
:param no_reader_thread: whether or not to automatically start the
reader thread.
:type no_reader_thread: bool

:raises: :py:class:`~alarmdecoder.util.NoDeviceError`
"""
# Set up the defaults
if baudrate is None:
baudrate = SerialDevice.BAUDRATE

if self._port is None:
raise NoDeviceError('No device interface specified.')

self._read_thread = Device.ReadThread(self)

# Open the device and start up the reader thread.
try:
self._device.port = self._port
self._device.open()
# NOTE: Setting the baudrate before opening the
# port caused issues with Moschip 7840/7820
# USB Serial Driver converter. (mos7840)
#
# Moving it to this point seems to resolve
# all issues with it.
self._device.baudrate = baudrate

except (serial.SerialException, ValueError, OSError) as err:
raise NoDeviceError('Error opening device on {0}.'.format(self._port), err)

else:
self._running = True
self.on_open()

if not no_reader_thread:
self._read_thread.start()

return self

def close(self):
"""
Closes the device.
"""
try:
Device.close(self)

except Exception:
pass

def fileno(self):
"""
Returns the file number associated with the device
:returns: int
"""
return self._device.fileno()

def write(self, data):
"""
Writes data to the device.

:param data: data to write
:type data: string

:raises: py:class:`~alarmdecoder.util.CommError`
"""
try:
# Hack to support unicode under Python 2.x
if isinstance(data, str) or (sys.version_info < (3,) and isinstance(data, unicode)):
data = data.encode('utf-8')

self._device.write(data)

except serial.SerialTimeoutException:
pass

except serial.SerialException as err:
raise CommError('Error writing to device.', err)

else:
self.on_write(data=data)

def read(self):
"""
Reads a single character from the device.

:returns: character read from the device
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
data = ''

try:
read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)

if len(read_ready) != 0:
data = self._device.read(1)

except serial.SerialException as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

return data.decode('utf-8')

def read_line(self, timeout=0.0, purge_buffer=False):
"""
Reads a line from the device.

:param timeout: read timeout
:type timeout: float
:param purge_buffer: Indicates whether to purge the buffer prior to
reading.
:type purge_buffer: bool

:returns: line that was read
:raises: :py:class:`~alarmdecoder.util.CommError`, :py:class:`~alarmdecoder.util.TimeoutError`
"""

def timeout_event():
"""Handles read timeout event"""
timeout_event.reading = False
timeout_event.reading = True

if purge_buffer:
self._buffer = b''

got_line, data = False, ''

timer = threading.Timer(timeout, timeout_event)
if timeout > 0:
timer.start()

leftovers = b''
try:
while timeout_event.reading and not got_line:
read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)
if len(read_ready) == 0:
continue

bytes_avail = 0
if hasattr(self._device, "in_waiting"):
bytes_avail = self._device.in_waiting
else:
bytes_avail = self._device.inWaiting()

buf = self._device.read(bytes_avail)

for idx in range(len(buf)):
c = buf[idx]

ub = bytes_hack(c)
if sys.version_info > (3,):
ub = bytes([ub])

# NOTE: AD2SERIAL and AD2PI apparently sends down \xFF on boot.
if ub != b'' and ub != b"\xff":
self._buffer += ub

if ub == b"\n":
self._buffer = self._buffer.strip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
leftovers = buf[idx:]
break

except (OSError, serial.SerialException) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

else:
if got_line:
data, self._buffer = self._buffer, leftovers

self.on_read(data=data)

else:
raise TimeoutError('Timeout while waiting for line terminator.')

finally:
timer.cancel()

return data.decode('utf-8')

def purge(self):
"""
Purges read/write buffers.
"""
self._device.flushInput()
self._device.flushOutput()

+ 399
- 0
alarmdecoder/devices/socket_device.py View File

@@ -0,0 +1,399 @@
"""
This module contains :py:class:`SocketDevice` interface for `AlarmDecoder`_ devices
that are exposed through `ser2sock`_ or another IP to serial solution. Also supports
SSL if using `ser2sock`_.

.. _ser2sock: http://github.com/nutechsoftware/ser2sock
.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import threading
import socket
import select
from .base_device import Device
from ..util import CommError, TimeoutError, NoDeviceError, bytes_hack

try:
from OpenSSL import SSL, crypto

have_openssl = True

except ImportError:
class SSL:
class Error(BaseException):
pass

class WantReadError(BaseException):
pass

class SysCallError(BaseException):
pass

have_openssl = False


class SocketDevice(Device):
"""
Device that supports communication with an `AlarmDecoder`_ (AD2) that is
exposed via `ser2sock`_ or another Serial to IP interface.
"""

@property
def interface(self):
"""
Retrieves the interface used to connect to the device.

:returns: interface used to connect to the device
"""
return (self._host, self._port)

@interface.setter
def interface(self, value):
"""
Sets the interface used to connect to the device.

:param value: Tuple containing the host and port to use
:type value: tuple
"""
self._host, self._port = value

@property
def ssl(self):
"""
Retrieves whether or not the device is using SSL.

:returns: whether or not the device is using SSL
"""
return self._use_ssl

@ssl.setter
def ssl(self, value):
"""
Sets whether or not SSL communication is in use.

:param value: Whether or not SSL communication is in use
:type value: bool
"""
self._use_ssl = value

@property
def ssl_certificate(self):
"""
Retrieves the SSL client certificate path used for authentication.

:returns: path to the certificate path or :py:class:`OpenSSL.crypto.X509`
"""
return self._ssl_certificate

@ssl_certificate.setter
def ssl_certificate(self, value):
"""
Sets the SSL client certificate to use for authentication.

:param value: path to the SSL certificate or :py:class:`OpenSSL.crypto.X509`
:type value: string or :py:class:`OpenSSL.crypto.X509`
"""
self._ssl_certificate = value

@property
def ssl_key(self):
"""
Retrieves the SSL client certificate key used for authentication.

:returns: jpath to the SSL key or :py:class:`OpenSSL.crypto.PKey`
"""
return self._ssl_key

@ssl_key.setter
def ssl_key(self, value):
"""
Sets the SSL client certificate key to use for authentication.

:param value: path to the SSL key or :py:class:`OpenSSL.crypto.PKey`
:type value: string or :py:class:`OpenSSL.crypto.PKey`
"""
self._ssl_key = value

@property
def ssl_ca(self):
"""
Retrieves the SSL Certificate Authority certificate used for
authentication.

:returns: path to the CA certificate or :py:class:`OpenSSL.crypto.X509`
"""
return self._ssl_ca

@ssl_ca.setter
def ssl_ca(self, value):
"""
Sets the SSL Certificate Authority certificate used for authentication.

:param value: path to the SSL CA certificate or :py:class:`OpenSSL.crypto.X509`
:type value: string or :py:class:`OpenSSL.crypto.X509`
"""
self._ssl_ca = value

def __init__(self, interface=("localhost", 10000)):
"""
Constructor

:param interface: Tuple containing the hostname and port of our target
:type interface: tuple
"""
Device.__init__(self)

self._host, self._port = interface
self._use_ssl = False
self._ssl_certificate = None
self._ssl_key = None
self._ssl_ca = None

def open(self, baudrate=None, no_reader_thread=False):
"""
Opens the device.

:param baudrate: baudrate to use
:type baudrate: int
:param no_reader_thread: whether or not to automatically open the reader
thread.
:type no_reader_thread: bool

:raises: :py:class:`~alarmdecoder.util.NoDeviceError`, :py:class:`~alarmdecoder.util.CommError`
"""

try:
self._read_thread = Device.ReadThread(self)

self._device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

if self._use_ssl:
self._init_ssl()

self._device.connect((self._host, self._port))

if self._use_ssl:
while True:
try:
self._device.do_handshake()
break
except SSL.WantReadError:
pass

self._id = '{0}:{1}'.format(self._host, self._port)

except socket.error as err:
raise NoDeviceError('Error opening device at {0}:{1}'.format(self._host, self._port), err)

else:
self._running = True
self.on_open()

if not no_reader_thread:
self._read_thread.start()

return self

def close(self):
"""
Closes the device.
"""
try:
# TODO: Find a way to speed up this shutdown.
if self.ssl:
self._device.shutdown()

else:
# Make sure that it closes immediately.
self._device.shutdown(socket.SHUT_RDWR)

except Exception:
pass

Device.close(self)

def fileno(self):
"""
Returns the file number associated with the device
:returns: int
"""
return self._device.fileno()

def write(self, data):
"""
Writes data to the device.

:param data: data to write
:type data: string

:returns: number of bytes sent
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
data_sent = None

try:
if isinstance(data, str):
data = data.encode('utf-8')

data_sent = self._device.send(data)

if data_sent == 0:
raise CommError('Error writing to device.')

self.on_write(data=data)

except (SSL.Error, socket.error) as err:
raise CommError('Error writing to device.', err)

return data_sent

def read(self):
"""
Reads a single character from the device.

:returns: character read from the device
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
data = ''

try:
read_ready, _, _ = select.select([self._device], [], [], 0.5)

if len(read_ready) != 0:
data = self._device.recv(1)

except socket.error as err:
raise CommError('Error while reading from device: {0}'.format(str(err)), err)

return data.decode('utf-8')

def read_line(self, timeout=0.0, purge_buffer=False):
"""
Reads a line from the device.

:param timeout: read timeout
:type timeout: float
:param purge_buffer: Indicates whether to purge the buffer prior to
reading.
:type purge_buffer: bool

:returns: line that was read
:raises: :py:class:`~alarmdecoder.util.CommError`, :py:class:`~alarmdecoder.util.TimeoutError`
"""

def timeout_event():
"""Handles read timeout event"""
timeout_event.reading = False
timeout_event.reading = True

if purge_buffer:
self._buffer = b''

got_line, ret = False, None

timer = threading.Timer(timeout, timeout_event)
if timeout > 0:
timer.start()

try:
while timeout_event.reading:
read_ready, _, _ = select.select([self._device], [], [], 0.5)

if len(read_ready) == 0:
continue

buf = self._device.recv(1)

if buf != b'' and buf != b"\xff":
ub = bytes_hack(buf)

self._buffer += ub

if ub == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
break

except socket.error as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

except SSL.SysCallError as err:
errno, msg = err
raise CommError('SSL error while reading from device: {0} ({1})'.format(msg, errno))

except Exception:
raise

else:
if got_line:
ret, self._buffer = self._buffer, b''

self.on_read(data=ret)

else:
raise TimeoutError('Timeout while waiting for line terminator.')

finally:
timer.cancel()

return ret.decode('utf-8')

def purge(self):
"""
Purges read/write buffers.
"""
try:
self._device.setblocking(0)
while(self._device.recv(1)):
pass
except socket.error as err:
pass
finally:
self._device.setblocking(1)

def _init_ssl(self):
"""
Initializes our device as an SSL connection.

:raises: :py:class:`~alarmdecoder.util.CommError`
"""

if not have_openssl:
raise ImportError('SSL sockets have been disabled due to missing requirement: pyopenssl.')

try:
ctx = SSL.Context(SSL.TLSv1_METHOD)

if isinstance(self.ssl_key, crypto.PKey):
ctx.use_privatekey(self.ssl_key)
else:
ctx.use_privatekey_file(self.ssl_key)

if isinstance(self.ssl_certificate, crypto.X509):
ctx.use_certificate(self.ssl_certificate)
else:
ctx.use_certificate_file(self.ssl_certificate)

if isinstance(self.ssl_ca, crypto.X509):
store = ctx.get_cert_store()
store.add_cert(self.ssl_ca)
else:
ctx.load_verify_locations(self.ssl_ca, None)

ctx.set_verify(SSL.VERIFY_PEER, self._verify_ssl_callback)

self._device = SSL.Connection(ctx, self._device)

except SSL.Error as err:
raise CommError('Error setting up SSL connection.', err)

def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok):
"""
SSL verification callback.
"""
return ok

+ 490
- 0
alarmdecoder/devices/usb_device.py View File

@@ -0,0 +1,490 @@
"""
This module contains the :py:class:`USBDevice` interface for the `AD2USB`_.

.. _AD2USB: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import time
import threading
from .base_device import Device
from ..util import CommError, TimeoutError, NoDeviceError, bytes_hack
from ..event import event

have_pyftdi = False
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
import usb.core
import usb.util

have_pyftdi = True

except ImportError:
try:
from pyftdi.ftdi import Ftdi, FtdiError
import usb.core
import usb.util

have_pyftdi = True

except ImportError:
have_pyftdi = False


class USBDevice(Device):
"""
`AD2USB`_ device utilizing PyFTDI's interface.
"""

# Constants
PRODUCT_IDS = ((0x0403, 0x6001), (0x0403, 0x6015))
"""List of Vendor and Product IDs used to recognize `AD2USB`_ devices."""
DEFAULT_VENDOR_ID = PRODUCT_IDS[0][0]
"""Default Vendor ID used to recognize `AD2USB`_ devices."""
DEFAULT_PRODUCT_ID = PRODUCT_IDS[0][1]
"""Default Product ID used to recognize `AD2USB`_ devices."""

# Deprecated constants
FTDI_VENDOR_ID = DEFAULT_VENDOR_ID
"""DEPRECATED: Vendor ID used to recognize `AD2USB`_ devices."""
FTDI_PRODUCT_ID = DEFAULT_PRODUCT_ID
"""DEPRECATED: Product ID used to recognize `AD2USB`_ devices."""


BAUDRATE = 115200
"""Default baudrate for `AD2USB`_ devices."""

__devices = []
__detect_thread = None

@classmethod
def find_all(cls, vid=None, pid=None):
"""
Returns all FTDI devices matching our vendor and product IDs.

:returns: list of devices
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
if not have_pyftdi:
raise ImportError('The USBDevice class has been disabled due to missing requirement: pyftdi or pyusb.')

cls.__devices = []

query = cls.PRODUCT_IDS
if vid and pid:
query = [(vid, pid)]

try:
cls.__devices = Ftdi.find_all(query, nocache=True)

except (usb.core.USBError, FtdiError) as err:
raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err)

return cls.__devices

@classmethod
def devices(cls):
"""
Returns a cached list of `AD2USB`_ devices located on the system.

:returns: cached list of devices found
"""
return cls.__devices

@classmethod
def find(cls, device=None):
"""
Factory method that returns the requested :py:class:`USBDevice` device, or the
first device.

:param device: Tuple describing the USB device to open, as returned
by find_all().
:type device: tuple

:returns: :py:class:`USBDevice` object utilizing the specified device
:raises: :py:class:`~alarmdecoder.util.NoDeviceError`
"""
if not have_pyftdi:
raise ImportError('The USBDevice class has been disabled due to missing requirement: pyftdi or pyusb.')

cls.find_all()

if len(cls.__devices) == 0:
raise NoDeviceError('No AD2USB devices present.')

if device is None:
device = cls.__devices[0]

vendor, product, sernum, ifcount, description = device

return USBDevice(interface=sernum, vid=vendor, pid=product)

@classmethod
def start_detection(cls, on_attached=None, on_detached=None):
"""
Starts the device detection thread.

:param on_attached: function to be called when a device is attached **Callback definition:** *def callback(thread, device)*
:type on_attached: function
:param on_detached: function to be called when a device is detached **Callback definition:** *def callback(thread, device)*

:type on_detached: function
"""
if not have_pyftdi:
raise ImportError('The USBDevice class has been disabled due to missing requirement: pyftdi or pyusb.')

cls.__detect_thread = USBDevice.DetectThread(on_attached, on_detached)

try:
cls.find_all()
except CommError:
pass

cls.__detect_thread.start()

@classmethod
def stop_detection(cls):
"""
Stops the device detection thread.
"""
if not have_pyftdi:
raise ImportError('The USBDevice class has been disabled due to missing requirement: pyftdi or pyusb.')

try:
cls.__detect_thread.stop()

except Exception:
pass

@property
def interface(self):
"""
Retrieves the interface used to connect to the device.

:returns: the interface used to connect to the device
"""
return self._interface

@interface.setter
def interface(self, value):
"""
Sets the interface used to connect to the device.

:param value: may specify either the serial number or the device index
:type value: string or int
"""
self._interface = value
if isinstance(value, int):
self._device_number = value
else:
self._serial_number = value

@property
def serial_number(self):
"""
Retrieves the serial number of the device.

:returns: serial number of the device
"""

return self._serial_number

@serial_number.setter
def serial_number(self, value):
"""
Sets the serial number of the device.

:param value: serial number of the device
:type value: string
"""
self._serial_number = value

@property
def description(self):
"""
Retrieves the description of the device.

:returns: description of the device
"""
return self._description

@description.setter
def description(self, value):
"""
Sets the description of the device.

:param value: description of the device
:type value: string
"""
self._description = value

def __init__(self, interface=0, vid=None, pid=None):
"""
Constructor

:param interface: May specify either the serial number or the device
index.
:type interface: string or int
"""
if not have_pyftdi:
raise ImportError('The USBDevice class has been disabled due to missing requirement: pyftdi or pyusb.')

Device.__init__(self)

self._device = Ftdi()

self._interface = 0
self._device_number = 0
self._serial_number = None

self._vendor_id = USBDevice.DEFAULT_VENDOR_ID
if vid:
self._vendor_id = vid

self._product_id = USBDevice.DEFAULT_PRODUCT_ID
if pid:
self._product_id = pid

self._endpoint = 0
self._description = None

self.interface = interface

def open(self, baudrate=BAUDRATE, no_reader_thread=False):
"""
Opens the device.

:param baudrate: baudrate to use
:type baudrate: int
:param no_reader_thread: whether or not to automatically start the
reader thread.
:type no_reader_thread: bool

:raises: :py:class:`~alarmdecoder.util.NoDeviceError`
"""
# Set up defaults
if baudrate is None:
baudrate = USBDevice.BAUDRATE

self._read_thread = Device.ReadThread(self)

# Open the device and start up the thread.
try:
self._device.open(self._vendor_id,
self._product_id,
self._endpoint,
self._device_number,
self._serial_number,
self._description)

self._device.set_baudrate(baudrate)

if not self._serial_number:
self._serial_number = self._get_serial_number()

self._id = self._serial_number

except (usb.core.USBError, FtdiError) as err:
raise NoDeviceError('Error opening device: {0}'.format(str(err)), err)

except KeyError as err:
raise NoDeviceError('Unsupported device. ({0:04x}:{1:04x}) You probably need a newer version of pyftdi.'.format(err[0][0], err[0][1]))

else:
self._running = True
self.on_open()

if not no_reader_thread:
self._read_thread.start()

return self

def close(self):
"""
Closes the device.
"""
try:
Device.close(self)

# HACK: Probably should fork pyftdi and make this call in .close()
self._device.usb_dev.attach_kernel_driver(self._device_number)

except Exception:
pass

def fileno(self):
"""
File number not supported for USB devices.
:raises: NotImplementedError
"""
raise NotImplementedError('USB devices do not support fileno()')

def write(self, data):
"""
Writes data to the device.

:param data: data to write
:type data: string

:raises: :py:class:`~alarmdecoder.util.CommError`
"""
try:
self._device.write_data(data)

self.on_write(data=data)

except FtdiError as err:
raise CommError('Error writing to device: {0}'.format(str(err)), err)

def read(self):
"""
Reads a single character from the device.

:returns: character read from the device
:raises: :py:class:`~alarmdecoder.util.CommError`
"""
ret = None

try:
ret = self._device.read_data(1)

except (usb.core.USBError, FtdiError) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

return ret

def read_line(self, timeout=0.0, purge_buffer=False):
"""
Reads a line from the device.

:param timeout: read timeout
:type timeout: float
:param purge_buffer: Indicates whether to purge the buffer prior to
reading.
:type purge_buffer: bool

:returns: line that was read
:raises: :py:class:`~alarmdecoder.util.CommError`, :py:class:`~alarmdecoder.util.TimeoutError`
"""

def timeout_event():
"""Handles read timeout event"""
timeout_event.reading = False
timeout_event.reading = True

if purge_buffer:
self._buffer = b''

got_line, ret = False, None

timer = threading.Timer(timeout, timeout_event)
if timeout > 0:
timer.start()

try:
while timeout_event.reading:
buf = self._device.read_data(1)

if buf != b'':
ub = bytes_hack(buf)

self._buffer += ub

if ub == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
break
else:
time.sleep(0.01)

except (usb.core.USBError, FtdiError) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

else:
if got_line:
ret, self._buffer = self._buffer, b''

self.on_read(data=ret)

else:
raise TimeoutError('Timeout while waiting for line terminator.')

finally:
timer.cancel()

return ret

def purge(self):
"""
Purges read/write buffers.
"""
self._device.purge_buffers()

def _get_serial_number(self):
"""
Retrieves the FTDI device serial number.

:returns: string containing the device serial number
"""
return usb.util.get_string(self._device.usb_dev, 64, self._device.usb_dev.iSerialNumber)

class DetectThread(threading.Thread):
"""
Thread that handles detection of added/removed devices.
"""
on_attached = event.Event("This event is called when an `AD2USB`_ device has been detected.\n\n**Callback definition:** def callback(thread, device*")
on_detached = event.Event("This event is called when an `AD2USB`_ device has been removed.\n\n**Callback definition:** def callback(thread, device*")

def __init__(self, on_attached=None, on_detached=None):
"""
Constructor

:param on_attached: Function to call when a device is attached **Callback definition:** *def callback(thread, device)*
:type on_attached: function
:param on_detached: Function to call when a device is detached **Callback definition:** *def callback(thread, device)*
:type on_detached: function
"""
threading.Thread.__init__(self)

if on_attached:
self.on_attached += on_attached

if on_detached:
self.on_detached += on_detached

self._running = False

def stop(self):
"""
Stops the thread.
"""
self._running = False

def run(self):
"""
The actual detection process.
"""
self._running = True

last_devices = set()

while self._running:
try:
current_devices = set(USBDevice.find_all())

for dev in current_devices.difference(last_devices):
self.on_attached(device=dev)

for dev in last_devices.difference(current_devices):
self.on_detached(device=dev)

last_devices = current_devices

except CommError:
pass

time.sleep(0.25)

+ 0
- 410
alarmdecoder/messages.py View File

@@ -1,410 +0,0 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

* :py:class:`Message`: The standard and most common message received from a panel.
* :py:class:`ExpanderMessage`: Messages received from Relay or Zone expander modules.
* :py:class:`RFMessage`: Message received from an RF receiver module.
* :py:class:`LRRMessage`: Message received from a long-range radio module.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import re
import datetime

try:
from reprlib import repr
except ImportError:
from repr import repr

from .util import InvalidMessageError
from .panels import PANEL_TYPES, ADEMCO, DSC


class BaseMessage(object):
"""
Base class for messages.
"""

raw = None
"""The raw message text"""

timestamp = None
"""The timestamp of the message"""

def __init__(self):
"""
Constructor
"""
self.timestamp = datetime.datetime.now()

def __str__(self):
"""
String conversion operator.
"""
return self.raw

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time=self.timestamp,
mesg=self.raw,
**kwargs
)

def __repr__(self):
"""
String representation.
"""
return repr(self.dict())


class Message(BaseMessage):
"""
Represents a message from the alarm panel.
"""

ready = False
"""Indicates whether or not the panel is in a ready state."""
armed_away = False
"""Indicates whether or not the panel is armed away."""
armed_home = False
"""Indicates whether or not the panel is armed home."""
backlight_on = False
"""Indicates whether or not the keypad backlight is on."""
programming_mode = False
"""Indicates whether or not we're in programming mode."""
beeps = -1
"""Number of beeps associated with a message."""
zone_bypassed = False
"""Indicates whether or not a zone is bypassed."""
ac_power = False
"""Indicates whether or not the panel is on AC power."""
chime_on = False
"""Indicates whether or not the chime is enabled."""
alarm_event_occurred = False
"""Indicates whether or not an alarm event has occurred."""
alarm_sounding = False
"""Indicates whether or not an alarm is sounding."""
battery_low = False
"""Indicates whether or not there is a low battery."""
entry_delay_off = False
"""Indicates whether or not the entry delay is enabled."""
fire_alarm = False
"""Indicates whether or not a fire alarm is sounding."""
check_zone = False
"""Indicates whether or not there are zones that require attention."""
perimeter_only = False
"""Indicates whether or not the perimeter is armed."""
system_fault = False
"""Indicates whether a system fault has occurred."""
panel_type = ADEMCO
"""Indicates which panel type was the source of this message."""
numeric_code = None
"""The numeric code associated with the message."""
text = None
"""The human-readable text to be displayed on the panel LCD."""
cursor_location = -1
"""Current cursor location on the keypad."""
mask = 0xFFFFFFFF
"""Address mask this message is intended for."""
bitfield = None
"""The bitfield associated with this message."""
panel_data = None
"""The panel data field associated with this message."""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self)

self._regex = re.compile('^(!KPM:){0,1}(\[[a-fA-F0-9\-]+\]),([a-fA-F0-9]+),(\[[a-fA-F0-9]+\]),(".+")$')

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parse the message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
match = self._regex.match(str(data))

if match is None:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

header, self.bitfield, self.numeric_code, self.panel_data, alpha = match.group(1, 2, 3, 4, 5)

is_bit_set = lambda bit: not self.bitfield[bit] == "0"

self.raw = data
self.ready = is_bit_set(1)
self.armed_away = is_bit_set(2)
self.armed_home = is_bit_set(3)
self.backlight_on = is_bit_set(4)
self.programming_mode = is_bit_set(5)
self.beeps = int(self.bitfield[6], 16)
self.zone_bypassed = is_bit_set(7)
self.ac_power = is_bit_set(8)
self.chime_on = is_bit_set(9)
self.alarm_event_occurred = is_bit_set(10)
self.alarm_sounding = is_bit_set(11)
self.battery_low = is_bit_set(12)
self.entry_delay_off = is_bit_set(13)
self.fire_alarm = is_bit_set(14)
self.check_zone = is_bit_set(15)
self.perimeter_only = is_bit_set(16)
self.system_fault = is_bit_set(17)
if self.bitfield[18] in list(PANEL_TYPES):
self.panel_type = PANEL_TYPES[self.bitfield[18]]
# pos 20-21 - Unused.
self.text = alpha.strip('"')
self.mask = int(self.panel_data[3:3+8], 16)

if self.panel_type in (ADEMCO, DSC):
if int(self.panel_data[19:21], 16) & 0x01 > 0:
# Current cursor location on the alpha display.
self.cursor_location = int(self.panel_data[21:23], 16)

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
bitfield = self.bitfield,
numeric_code = self.numeric_code,
panel_data = self.panel_data,
mask = self.mask,
ready = self.ready,
armed_away = self.armed_away,
armed_home = self.armed_home,
backlight_on = self.backlight_on,
programming_mode = self.programming_mode,
beeps = self.beeps,
zone_bypassed = self.zone_bypassed,
ac_power = self.ac_power,
chime_on = self.chime_on,
alarm_event_occurred = self.alarm_event_occurred,
alarm_sounding = self.alarm_sounding,
battery_low = self.battery_low,
entry_delay_off = self.entry_delay_off,
fire_alarm = self.fire_alarm,
check_zone = self.check_zone,
perimeter_only = self.perimeter_only,
text = self.text,
cursor_location = self.cursor_location,
**kwargs
)


class ExpanderMessage(BaseMessage):
"""
Represents a message from a zone or relay expansion module.
"""

ZONE = 0
"""Flag indicating that the expander message relates to a Zone Expander."""
RELAY = 1
"""Flag indicating that the expander message relates to a Relay Expander."""

type = None
"""Expander message type: ExpanderMessage.ZONE or ExpanderMessage.RELAY"""
address = -1
"""Address of expander"""
channel = -1
"""Channel on the expander"""
value = -1
"""Value associated with the message"""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parse the raw message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
header, values = data.split(':')
address, channel, value = values.split(',')

self.raw = data
self.address = int(address)
self.channel = int(channel)
self.value = int(value)

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

if header == '!EXP':
self.type = ExpanderMessage.ZONE
elif header == '!REL':
self.type = ExpanderMessage.RELAY
else:
raise InvalidMessageError('Unknown expander message header: {0}'.format(data))

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
address = self.address,
channel = self.channel,
value = self.value,
**kwargs
)


class RFMessage(BaseMessage):
"""
Represents a message from an RF receiver.
"""

serial_number = None
"""Serial number of the RF device."""
value = -1
"""Value associated with this message."""
battery = False
"""Low battery indication"""
supervision = False
"""Supervision required indication"""
loop = [False for _ in list(range(4))]
"""Loop indicators"""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parses the raw message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
self.raw = data

_, values = data.split(':')
self.serial_number, self.value = values.split(',')
self.value = int(self.value, 16)

is_bit_set = lambda b: self.value & (1 << (b - 1)) > 0

# Bit 1 = unknown
self.battery = is_bit_set(2)
self.supervision = is_bit_set(3)
# Bit 4 = unknown
self.loop[2] = is_bit_set(5)
self.loop[1] = is_bit_set(6)
self.loop[3] = is_bit_set(7)
self.loop[0] = is_bit_set(8)

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
serial_number = self.serial_number,
value = self.value,
battery = self.battery,
supervision = self.supervision,
**kwargs
)


class LRRMessage(BaseMessage):
"""
Represent a message from a Long Range Radio.
"""

event_data = None
"""Data associated with the LRR message. Usually user ID or zone."""
partition = -1
"""The partition that this message applies to."""
event_type = None
"""The type of the event that occurred."""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parses the raw message from the device.

:param data: message data to parse
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
self.raw = data

_, values = data.split(':')
self.event_data, self.partition, self.event_type = values.split(',')

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
event_data = self.event_data,
event_type = self.event_type,
partition = self.partition,
**kwargs
)

+ 9
- 0
alarmdecoder/messages/__init__.py View File

@@ -0,0 +1,9 @@
from .base_message import BaseMessage
from .panel_message import Message
from .expander_message import ExpanderMessage
from .lrr import LRRMessage
from .rf_message import RFMessage
from .aui_message import AUIMessage


__all__ = ['BaseMessage', 'Message', 'ExpanderMessage', 'LRRMessage', 'RFMessage', 'AUIMessage']

+ 47
- 0
alarmdecoder/messages/aui_message.py View File

@@ -0,0 +1,47 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

:py:class:`AUIMessage`: Message received destined for an AUI keypad.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

from . import BaseMessage
from ..util import InvalidMessageError

class AUIMessage(BaseMessage):
"""
Represents a message destined for an AUI keypad.
"""

value = None
"""Raw value of the AUI message"""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self, data)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
header, value = data.split(':')

self.value = value

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
value = self.value,
**kwargs
)

+ 46
- 0
alarmdecoder/messages/base_message.py View File

@@ -0,0 +1,46 @@
import datetime

try:
from repr import repr
except ImportError:
from repr import repr

class BaseMessage(object):
"""
Base class for messages.
"""

raw = None
"""The raw message text"""

timestamp = None
"""The timestamp of the message"""

def __init__(self, data=None):
"""
Constructor
"""
self.timestamp = datetime.datetime.now()
self.raw = data

def __str__(self):
"""
String conversion operator.
"""
return self.raw

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time=self.timestamp,
mesg=self.raw,
**kwargs
)

def __repr__(self):
"""
String representation.
"""
return repr(self.dict())

+ 83
- 0
alarmdecoder/messages/expander_message.py View File

@@ -0,0 +1,83 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

:py:class:`ExpanderMessage`: Messages received from Relay or Zone expander modules.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

from . import BaseMessage
from ..util import InvalidMessageError

class ExpanderMessage(BaseMessage):
"""
Represents a message from a zone or relay expansion module.
"""

ZONE = 0
"""Flag indicating that the expander message relates to a Zone Expander."""
RELAY = 1
"""Flag indicating that the expander message relates to a Relay Expander."""

type = None
"""Expander message type: ExpanderMessage.ZONE or ExpanderMessage.RELAY"""
address = -1
"""Address of expander"""
channel = -1
"""Channel on the expander"""
value = -1
"""Value associated with the message"""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self, data)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parse the raw message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
header, values = data.split(':')
address, channel, value = values.split(',')

self.address = int(address)
self.channel = int(channel)
self.value = int(value)

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

if header == '!EXP':
self.type = ExpanderMessage.ZONE
elif header == '!REL':
self.type = ExpanderMessage.RELAY
else:
raise InvalidMessageError('Unknown expander message header: {0}'.format(data))

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
address = self.address,
channel = self.channel,
value = self.value,
**kwargs
)

+ 9
- 0
alarmdecoder/messages/lrr/__init__.py View File

@@ -0,0 +1,9 @@
from .message import LRRMessage
from .system import LRRSystem
from .events import get_event_description, get_event_source, LRR_EVENT_TYPE, LRR_EVENT_STATUS, LRR_CID_EVENT, LRR_DSC_EVENT, LRR_ADEMCO_EVENT, \
LRR_ALARMDECODER_EVENT, LRR_UNKNOWN_EVENT, LRR_CID_MAP, LRR_DSC_MAP, LRR_ADEMCO_MAP, \
LRR_ALARMDECODER_MAP, LRR_UNKNOWN_MAP

__all__ = ['get_event_description', 'get_event_source', 'LRRMessage', 'LRR_EVENT_TYPE', 'LRR_EVENT_STATUS', 'LRR_CID_EVENT', 'LRR_DSC_EVENT',
'LRR_ADEMCO_EVENT', 'LRR_ALARMDECODER_EVENT', 'LRR_UNKNOWN_EVENT', 'LRR_CID_MAP',
'LRR_DSC_MAP', 'LRR_ADEMCO_MAP', 'LRR_ALARMDECODER_MAP', 'LRR_UNKNOWN_MAP']

+ 819
- 0
alarmdecoder/messages/lrr/events.py View File

@@ -0,0 +1,819 @@
"""
Constants and utility functions used for LRR event handling.

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

def get_event_description(event_type, event_code):
"""
Retrieves the human-readable description of an LRR event.

:param event_type: Base LRR event type. Use LRR_EVENT_TYPE.*
:type event_type: int
:param event_code: LRR event code
:type event_code: int

:returns: string
"""
description = 'Unknown'
lookup_map = LRR_TYPE_MAP.get(event_type, None)

if lookup_map is not None:
description = lookup_map.get(event_code, description)

return description

def get_event_source(prefix):
"""
Retrieves the LRR_EVENT_TYPE corresponding to the prefix provided.abs

:param prefix: Prefix to convert to event type
:type prefix: string

:returns: int
"""
source = LRR_EVENT_TYPE.UNKNOWN

if prefix == 'CID':
source = LRR_EVENT_TYPE.CID
elif prefix == 'DSC':
source = LRR_EVENT_TYPE.DSC
elif prefix == 'AD2':
source = LRR_EVENT_TYPE.ALARMDECODER
elif prefix == 'ADEMCO':
source = LRR_EVENT_TYPE.ADEMCO

return source


class LRR_EVENT_TYPE:
"""
Base LRR event types
"""
CID = 1
DSC = 2
ADEMCO = 3
ALARMDECODER = 4
UNKNOWN = 5


class LRR_EVENT_STATUS:
"""
LRR event status codes
"""
TRIGGER = 1
RESTORE = 3


class LRR_CID_EVENT:
"""
ContactID event codes
"""
MEDICAL = 0x100
MEDICAL_PENDANT = 0x101
MEDICAL_FAIL_TO_REPORT = 0x102
# 103-108: ?
TAMPER_ZONE = 0x109 # NOTE: Where did we find this?
FIRE = 0x110
FIRE_SMOKE = 0x111
FIRE_COMBUSTION = 0x112
FIRE_WATER_FLOW = 0x113
FIRE_HEAT = 0x114
FIRE_PULL_STATION = 0x115
FIRE_DUCT = 0x116
FIRE_FLAME = 0x117
FIRE_NEAR_ALARM = 0x118
PANIC = 0x120
PANIC_DURESS = 0x121
PANIC_SILENT = 0x122
PANIC_AUDIBLE = 0x123
PANIC_DURESS_ACCESS_GRANTED = 0x124
PANIC_DURESS_EGRESS_GRANTED = 0x125
PANIC_HOLDUP_SUSPICION = 0x126
# 127-128: ?
PANIC_HOLDUP_VERIFIER = 0x129
BURGLARY = 0x130
BURGLARY_PERIMETER = 0x131
BURGLARY_INTERIOR = 0x132
BURGLARY_AUX = 0x133
BURGLARY_ENTRYEXIT = 0x134
BURGLARY_DAYNIGHT = 0x135
BURGLARY_OUTDOOR = 0x136
BURGLARY_TAMPER = 0x137
BURGLARY_NEAR_ALARM = 0x138
BURGLARY_INTRUSION_VERIFIER = 0x139
ALARM_GENERAL = 0x140
ALARM_POLLING_LOOP_OPEN = 0x141
ALARM_POLLING_LOOP_SHORT = 0x142
ALARM_EXPANSION_MOD_FAILURE = 0x143
ALARM_SENSOR_TAMPER = 0x144
ALARM_EXPANSION_MOD_TAMPER = 0x145
BURGLARY_SILENT = 0x146
TROUBLE_SENSOR_SUPERVISION = 0x147
# 148-149: ?
ALARM_AUX = 0x150
ALARM_GAS_DETECTED = 0x151
ALARM_REFRIDGERATION = 0x152
ALARM_LOSS_OF_HEAT = 0x153
ALARM_WATER_LEAKAGE = 0x154
TROUBLE_FOIL_BREAK = 0x155
TROUBLE_DAY_TROUBLE = 0x156
ALARM_LOW_BOTTLED_GAS_LEVEL = 0x157
ALARM_HIGH_TEMP = 0x158
ALARM_LOW_TEMP = 0x159
# 160: ?
ALARM_LOSS_OF_AIR_FLOW = 0x161
ALARM_CARBON_MONOXIDE = 0x162
TROUBLE_TANK_LEVEL = 0x163
# 164-167: ?
TROUBLE_HIGH_HUMIDITY = 0x168
TROUBLE_LOW_HUMIDITY = 0x169
# 170-199: ?
SUPERVISORY_FIRE = 0x200
SUPERVISORY_LOW_PRESSURE = 0x201
SUPERVISORY_LOW_CO2 = 0x202
SUPERVISORY_GATE_VALVE_SENSOR = 0x203
SUPERVISORY_LOW_WATER_LEVEL = 0x204
SUPERVISORY_PUMP_ACTIVATED = 0x205
SUPERVISORY_PUMP_FAILURE = 0x206
# 207-299: ?
TROUBLE_SYSTEM_TROUBLE = 0x300
TROUBLE_AC_LOSS = 0x301
TROUBLE_LOW_BATTERY = 0x302
TROUBLE_RAM_CHECKSUM_BAD = 0x303
TROUBLE_ROM_CHECKSUM_BAD = 0x304
TROUBLE_RESET = 0x305
TROUBLE_PANEL_PROGRAMMING_CHANGED = 0x306
TROUBLE_SELF_TEST_FAILURE = 0x307
TROUBLE_SHUTDOWN = 0x308
TROUBLE_BATTERY_TEST_FAIL = 0x309
TROUBLE_GROUND_FAULT = 0x310
TROUBLE_BATTERY_MISSING = 0x311
TROUBLE_POWER_SUPPLY_OVERCURRENT = 0x312
STATUS_ENGINEER_RESET = 0x313
TROUBLE_PRIMARY_POWER_SUPPLY_FAILURE = 0x314
# 315: ?
TROUBLE_TAMPER = 0x316
# 317-319: ?
TROUBLE_SOUNDER = 0x320
TROUBLE_BELL_1 = 0x321
TROUBLE_BELL_2 = 0x322
TROUBLE_ALARM_RELAY = 0x323
TROUBLE_TROUBLE_RELAY = 0x324
TROUBLE_REVERSING_RELAY = 0x325
TROUBLE_NOTIFICATION_APPLIANCE_CIRCUIT_3 = 0x326
TROUBLE_NOTIFICATION_APPLIANCE_CIRCUIT_4 = 0x327
# 328-329: ?
TROUBLE_SYSTEM_PERIPHERAL = 0x330
TROUBLE_POLLING_LOOP_OPEN = 0x331
TROUBLE_POLLING_LOOP_SHORT = 0x332
TROUBLE_EXPANSION_MODULE_FAILURE = 0x333
TROUBLE_REPEATER_FAILURE = 0x334
TROUBLE_LOCAL_PRINTER_PAPER_OUT = 0x335
TROUBLE_LOCAL_PRINTER_FAILURE = 0x336
TROUBLE_EXPANDER_MODULE_DC_LOSS = 0x337
TROUBLE_EXPANDER_MODULE_LOW_BATTERY = 0x338
TROUBLE_EXPANDER_MODULE_RESET = 0x339
# 340: ?
TROUBLE_EXPANDER_MODULE_TAMPER = 0x341
TROUBLE_EXPANDER_MODULE_AC_LOSS = 0x342
TROUBLE_EXPANDER_MODULE_SELF_TEST_FAIL = 0x343
TROUBLE_RF_RECEIVER_JAM_DETECTED = 0x344
TROUBLE_AES_ENCRYPTION = 0x345
# 346-349: ?
TROUBLE_COMMUNICATION = 0x350
TROUBLE_TELCO_1_FAULT = 0x351
TROUBLE_TELCO_2_FAULT = 0x352
TROUBLE_LRR_TRANSMITTER_FAULT = 0x353
TROUBLE_FAILURE_TO_COMMUNICATE = 0x354
TROUBLE_LOSS_OF_RADIO_SUPERVISION = 0x355
TROUBLE_LOSS_OF_CENTRAL_POLLING = 0x356
TROUBLE_LRR_TRANSMITTER_VSWR = 0x357
TROUBLE_PERIODIC_COMM_TEST = 0x358
# 359-369: ?
TROUBLE_PROTECTION_LOOP = 0x370
TROUBLE_PROTECTION_LOOP_OPEN = 0x371
TROUBLE_PROTECTION_LOOP_SHORT = 0x372
TROUBLE_FIRE = 0x373
TROUBLE_EXIT_ERROR = 0x374
TROUBLE_PANIC_ZONE_TROUBLE = 0x375
TROUBLE_HOLDUP_ZONE_TROUBLE = 0x376
TROUBLE_SWINGER_TROUBLE = 0x377
TROUBLE_CROSS_ZONE_TROUBLE = 0x378
# 379: ?
TROUBLE_SENSOR_TROUBLE = 0x380
TROUBLE_RF_LOSS_OF_SUPERVISION = 0x381
TROUBLE_RPM_LOSS_OF_SUPERVISION = 0x382
TROUBLE_SENSOR_TAMPER = 0x383
TROUBLE_RF_LOW_BATTERY = 0x384
TROUBLE_SMOKE_HI_SENS = 0x385
TROUBLE_SMOKE_LO_SENS = 0x386
TROUBLE_INTRUSION_HI_SENS = 0x387
TROUBLE_INTRUSION_LO_SENS = 0x388
TROUBLE_SELF_TEST_FAIL = 0x389
# 390: ?
TROUBLE_SENSOR_WATCH_FAIL = 0x391
TROUBLE_DRIFT_COMP_ERROR = 0x392
TROUBLE_MAINTENANCE_ALERT = 0x393
# 394-399: ?
OPENCLOSE = 0x400
OPENCLOSE_BY_USER = 0x401
OPENCLOSE_GROUP = 0x402
OPENCLOSE_AUTOMATIC = 0x403
OPENCLOSE_LATE = 0x404
OPENCLOSE_DEFERRED = 0x405
OPENCLOSE_CANCEL_BY_USER = 0x406
OPENCLOSE_REMOTE_ARMDISARM = 0x407
OPENCLOSE_QUICK_ARM = 0x408
OPENCLOSE_KEYSWITCH = 0x409
# 410: ?
REMOTE_CALLBACK_REQUESTED = 0x411
REMOTE_SUCCESS = 0x412
REMOTE_UNSUCCESSFUL = 0x413
REMOTE_SYSTEM_SHUTDOWN = 0x414
REMOTE_DIALER_SHUTDOWN = 0x415
REMOTE_SUCCESSFUL_UPLOAD = 0x416
# 417-420: ?
ACCESS_DENIED = 0x421
ACCESS_REPORT_BY_USER = 0x422
ACCESS_FORCED_ACCESS = 0x423
ACCESS_EGRESS_DENIED = 0x424
ACCESS_EGRESS_GRANTED = 0x425
ACCESS_DOOR_PROPPED_OPEN = 0x426
ACCESS_POINT_DSM_TROUBLE = 0x427
ACCESS_POINT_RTE_TROUBLE = 0x428
ACCESS_PROGRAM_MODE_ENTRY = 0x429
ACCESS_PROGRAM_MODE_EXIT = 0x430
ACCESS_THREAT_LEVEL_CHANGE = 0x431
ACCESS_RELAY_FAIL = 0x432
ACCESS_RTE_SHUNT = 0x433
ACCESS_DSM_SHUNT = 0x434
ACCESS_SECOND_PERSON = 0x435
ACCESS_IRREGULAR_ACCESS = 0x436
# 437-440: ?
OPENCLOSE_ARMED_STAY = 0x441
OPENCLOSE_KEYSWITCH_ARMED_STAY = 0x442
# 443-449: ?
OPENCLOSE_EXCEPTION = 0x450
OPENCLOSE_EARLY = 0x451
OPENCLOSE_LATE = 0x452
TROUBLE_FAILED_TO_OPEN = 0x453
TROUBLE_FAILED_TO_CLOSE = 0x454
TROUBLE_AUTO_ARM_FAILED = 0x455
OPENCLOSE_PARTIAL_ARM = 0x456
OPENCLOSE_EXIT_ERROR = 0x457
OPENCLOSE_USER_ON_PREMISES = 0x458
TROUBLE_RECENT_CLOSE = 0x459
# 460: ?
ACCESS_WRONG_CODE_ENTRY = 0x461
ACCESS_LEGAL_CODE_ENTRY = 0x462
STATUS_REARM_AFTER_ALARM = 0x463
STATUS_AUTO_ARM_TIME_EXTENDED = 0x464
STATUS_PANIC_ALARM_RESET = 0x465
ACCESS_SERVICE_ONOFF_PREMISES = 0x466
# 467-469: ?
OPENCLOSE_PARTIAL_CLOSING = 0x470 # HACK: This is from our DSC firmware implementation,
# and is named far too closely to 0x480.
# 471-479: ?
OPENCLOSE_PARTIAL_CLOSE = 0x480
# 481-500: ?
DISABLE_ACCESS_READER = 0x501
# 502-519: ?
DISABLE_SOUNDER = 0x520
DISABLE_BELL_1 = 0x521
DISABLE_BELL_2 = 0x522
DISABLE_ALARM_RELAY = 0x523
DISABLE_TROUBLE_RELAY = 0x524
DISABLE_REVERSING_RELAY = 0x525
DISABLE_NOTIFICATION_APPLIANCE_CIRCUIT_3 = 0x526
DISABLE_NOTIFICATION_APPLIANCE_CIRCUIT_4 = 0x527
# 528-530: ?
SUPERVISORY_MODULE_ADDED = 0x531
SUPERVISORY_MODULE_REMOVED = 0x532
# 533-550: ?
DISABLE_DIALER = 0x551
DISABLE_RADIO_TRANSMITTER = 0x552
DISABLE_REMOTE_UPLOADDOWNLOAD = 0x553
# 554-569: ?
BYPASS_ZONE = 0x570
BYPASS_FIRE = 0x571
BYPASS_24HOUR_ZONE = 0x572
BYPASS_BURGLARY = 0x573
BYPASS_GROUP = 0x574
BYPASS_SWINGER = 0x575
BYPASS_ACCESS_ZONE_SHUNT = 0x576
BYPASS_ACCESS_POINT_BYPASS = 0x577
BYPASS_ZONE_VAULT = 0x578
BYPASS_ZONE_VENT = 0x579
# 580-600: ?
TEST_MANUAL = 0x601
TEST_PERIODIC = 0x602
TEST_PERIODIC_RF_TRANSMISSION = 0x603
TEST_FIRE = 0x604
TEST_FIRE_STATUS = 0x605
TEST_LISTENIN_TO_FOLLOW = 0x606
TEST_WALK = 0x607
TEST_SYSTEM_TROUBLE_PRESENT = 0x608
TEST_VIDEO_TRANSMITTER_ACTIVE = 0x609
# 610: ?
TEST_POINT_TESTED_OK = 0x611
TEST_POINT_NOT_TESTED = 0x612
TEST_INTRUSION_ZONE_WALK_TESTED = 0x613
TEST_FIRE_ZONE_WALK_TESTED = 0x614
TEST_PANIC_ZONE_WALK_TESTED = 0x615
TROUBLE_SERVICE_REQUEST = 0x616
# 617-620: ?
TROUBLE_EVENT_LOG_RESET = 0x621
TROUBLE_EVENT_LOG_50PERCENT_FULL = 0x622
TROUBLE_EVENT_LOG_90PERCENT_FULL = 0x623
TROUBLE_EVENT_LOG_OVERFLOW = 0x624
TROUBLE_TIMEDATE_RESET = 0x625
TROUBLE_TIMEDATE_INACCURATE = 0x626
TROUBLE_PROGRAM_MODE_ENTRY = 0x627
TROUBLE_PROGRAM_MODE_EXIT = 0x628
TROUBLE_32HOUR_EVENT_LOG_MARKER = 0x629
SCHEDULE_CHANGE = 0x630
SCHEDULE_EXCEPTION_SCHEDULE_CHANGE = 0x631
SCHEDULE_ACCESS_SCHEDULE_CHANGE = 0x632
# 633-640: ?
TROUBLE_SENIOR_WATCH_TROUBLE = 0x641
STATUS_LATCHKEY_SUPERVISION = 0x642
# 643-650: ?
SPECIAL_ADT_AUTHORIZATION = 0x651
RESERVED_652 = 0x652
RESERVED_653 = 0x653
TROUBLE_SYSTEM_INACTIVITY = 0x654
# 750-789: User Assigned
# 790-795: ?
TROUBLE_UNABLE_TO_OUTPUT_SIGNAL = 0x796
# 797: ?
TROUBLE_STU_CONTROLLER_DOWN = 0x798
# 799-899: ?
REMOTE_DOWNLOAD_ABORT = 0x900
REMOTE_DOWNLOAD_STARTEND = 0x901
REMOTE_DOWNLOAD_INTERRUPTED = 0x902
REMOTE_CODE_DOWNLOAD_STARTEND = 0x903
REMOTE_CODE_DOWNLOAD_FAILED = 0x904
# 905-909: ?
OPENCLOSE_AUTOCLOSE_WITH_BYPASS = 0x910
OPENCLOSE_BYPASS_CLOSING = 0x911
EVENT_FIRE_ALARM_SILENCED = 0x912
EVENT_SUPERVISOR_POINT_STARTEND = 0x913
EVENT_HOLDUP_TEST_STARTEND = 0x914
EVENT_BURGLARY_TEST_PRINT_STARTEND = 0x915
EVENT_SUPERVISORY_TEST_PRINT_STARTEND = 0x916
EVENT_BURGLARY_DIAGNOSTICS_STARTEND = 0x917
EVENT_FIRE_DIAGNOSTICS_STARTEND = 0x918
EVENT_UNTYPED_DIAGNOSTICS = 0x919
EVENT_TROUBLE_CLOSING = 0x920
EVENT_ACCESS_DENIED_CODE_UNKNOWN = 0x921
ALARM_SUPERVISORY_POINT = 0x922
EVENT_SUPERVISORY_POINT_BYPASS = 0x923
TROUBLE_SUPERVISORY_POINT = 0x924
EVENT_HOLDUP_POINT_BYPASS = 0x925
EVENT_AC_FAILURE_FOR_4HOURS = 0x926
TROUBLE_OUTPUT = 0x927
EVENT_USER_CODE_FOR_EVENT = 0x928
EVENT_LOG_OFF = 0x929
# 930-953: ?
EVENT_CS_CONNECTION_FAILURE = 0x954
# 955-960: ?
EVENT_RECEIVER_DATABASE_CONNECTION = 0x961
EVENT_LICENSE_EXPIRATION = 0x962
# 963-998: ?
OTHER_NO_READ_LOG = 0x999


class LRR_DSC_EVENT:
"""
DSC event codes
"""
ZONE_EXPANDER_SUPERVISORY_ALARM = 0x04c
ZONE_EXPANDER_SUPERVISORY_RESTORE = 0x04d
AUX_INPUT_ALARM = 0x051
SPECIAL_CLOSING = 0x0bf
CROSS_ZONE_POLICE_CODE_ALARM = 0x103
AUTOMATIC_CLOSING = 0x12b
ZONE_BYPASS = 0x570
REPORT_DSC_USER_LOG_EVENT = 0x800


class LRR_ADEMCO_EVENT:
"""
ADEMCO event codes
"""
pass


class LRR_ALARMDECODER_EVENT:
"""
AlarmDecoder event codes
"""
CUSTOM_PROG_MSG = 0x0
CUSTOM_PROG_KEY = 0x1


class LRR_UNKNOWN_EVENT:
"""
Unknown event codes. Realistically there shouldn't ever be anything here.
"""
pass


# Map of ContactID event codes to human-readable text.
LRR_CID_MAP = {
LRR_CID_EVENT.MEDICAL: 'Medical Emergency: Non-specific',
LRR_CID_EVENT.MEDICAL_PENDANT: 'Emergency Assistance Request',
LRR_CID_EVENT.MEDICAL_FAIL_TO_REPORT: 'Medical: Failed to activate monitoring device',
LRR_CID_EVENT.TAMPER_ZONE: 'Zone Tamper',
LRR_CID_EVENT.FIRE: 'Fire: Non-specific',
LRR_CID_EVENT.FIRE_SMOKE: 'Fire: Smoke Alarm',
LRR_CID_EVENT.FIRE_COMBUSTION: 'Fire: Combustion',
LRR_CID_EVENT.FIRE_WATER_FLOW: 'Fire: Water Flow',
LRR_CID_EVENT.FIRE_HEAT: 'Fire: Heat',
LRR_CID_EVENT.FIRE_PULL_STATION: 'Fire: Pull Station',
LRR_CID_EVENT.FIRE_DUCT: 'Fire: Duct',
LRR_CID_EVENT.FIRE_FLAME: 'Fire: Flame',
LRR_CID_EVENT.FIRE_NEAR_ALARM: 'Fire: Near Alarm',
LRR_CID_EVENT.PANIC: 'Panic',
LRR_CID_EVENT.PANIC_DURESS: 'Panic: Duress',
LRR_CID_EVENT.PANIC_SILENT: 'Panic: Silent',
LRR_CID_EVENT.PANIC_AUDIBLE: 'Panic: Audible',
LRR_CID_EVENT.PANIC_DURESS_ACCESS_GRANTED: 'Fire: Duress',
LRR_CID_EVENT.PANIC_DURESS_EGRESS_GRANTED: 'Fire: Egress',
LRR_CID_EVENT.PANIC_HOLDUP_SUSPICION: 'Panic: Hold-up, Suspicious Condition',
LRR_CID_EVENT.PANIC_HOLDUP_VERIFIER: 'Panic: Hold-up Verified',
LRR_CID_EVENT.BURGLARY: 'Burglary',
LRR_CID_EVENT.BURGLARY_PERIMETER: 'Burglary: Perimeter',
LRR_CID_EVENT.BURGLARY_INTERIOR: 'Burglary: Interior',
LRR_CID_EVENT.BURGLARY_AUX: 'Burglary: 24 Hour',
LRR_CID_EVENT.BURGLARY_ENTRYEXIT: 'Burglary: Entry/Exit',
LRR_CID_EVENT.BURGLARY_DAYNIGHT: 'Burglary: Day/Night',
LRR_CID_EVENT.BURGLARY_OUTDOOR: 'Burglary: Outdoor',
LRR_CID_EVENT.BURGLARY_TAMPER: 'Burglary: Tamper',
LRR_CID_EVENT.BURGLARY_NEAR_ALARM: 'Burglary: Near Alarm',
LRR_CID_EVENT.BURGLARY_INTRUSION_VERIFIER: 'Burglary: Intrusion Verifier',
LRR_CID_EVENT.ALARM_GENERAL: 'Alarm: General',
LRR_CID_EVENT.ALARM_POLLING_LOOP_OPEN: 'Alarm: Polling Loop Open',
LRR_CID_EVENT.ALARM_POLLING_LOOP_SHORT: 'Alarm: Polling Loop Closed',
LRR_CID_EVENT.ALARM_EXPANSION_MOD_FAILURE: 'Alarm: Expansion Module Failure',
LRR_CID_EVENT.ALARM_SENSOR_TAMPER: 'Alarm: Sensor Tamper',
LRR_CID_EVENT.ALARM_EXPANSION_MOD_TAMPER: 'Alarm: Expansion Module Tamper',
LRR_CID_EVENT.BURGLARY_SILENT: 'Burglary: Silent',
LRR_CID_EVENT.TROUBLE_SENSOR_SUPERVISION: 'Trouble: Sensor Supervision Failure',
LRR_CID_EVENT.ALARM_AUX: 'Alarm: 24 Hour Non-Burglary',
LRR_CID_EVENT.ALARM_GAS_DETECTED: 'Alarm: Gas Detected',
LRR_CID_EVENT.ALARM_REFRIDGERATION: 'Alarm: Refridgeration',
LRR_CID_EVENT.ALARM_LOSS_OF_HEAT: 'Alarm: Loss of Heat',
LRR_CID_EVENT.ALARM_WATER_LEAKAGE: 'Alarm: Water Leakage',
LRR_CID_EVENT.TROUBLE_FOIL_BREAK: 'Trouble: Foil Break',
LRR_CID_EVENT.TROUBLE_DAY_TROUBLE: 'Trouble: Day Trouble',
LRR_CID_EVENT.ALARM_LOW_BOTTLED_GAS_LEVEL: 'Alarm: Low Bottled Gas Level',
LRR_CID_EVENT.ALARM_HIGH_TEMP: 'Alarm: High Temperature',
LRR_CID_EVENT.ALARM_LOW_TEMP: 'Alarm: Low Temperature',
LRR_CID_EVENT.ALARM_LOSS_OF_AIR_FLOW: 'Alarm: Loss of Air Flow',
LRR_CID_EVENT.ALARM_CARBON_MONOXIDE: 'Alarm: Carbon Monoxide',
LRR_CID_EVENT.TROUBLE_TANK_LEVEL: 'Trouble: Tank Level',
LRR_CID_EVENT.TROUBLE_HIGH_HUMIDITY: 'Trouble: High Humidity',
LRR_CID_EVENT.TROUBLE_LOW_HUMIDITY: 'Trouble: Low Humidity',
LRR_CID_EVENT.SUPERVISORY_FIRE: 'Supervisory: Fire',
LRR_CID_EVENT.SUPERVISORY_LOW_PRESSURE: 'Supervisory: Low Water Pressure',
LRR_CID_EVENT.SUPERVISORY_LOW_CO2: 'Supervisory: Low CO2',
LRR_CID_EVENT.SUPERVISORY_GATE_VALVE_SENSOR: 'Supervisory: Gate Valve Sensor',
LRR_CID_EVENT.SUPERVISORY_LOW_WATER_LEVEL: 'Supervisory: Low Water Level',
LRR_CID_EVENT.SUPERVISORY_PUMP_ACTIVATED: 'Supervisory: Pump Activated',
LRR_CID_EVENT.SUPERVISORY_PUMP_FAILURE: 'Supervisory: Pump Failure',
LRR_CID_EVENT.TROUBLE_SYSTEM_TROUBLE: 'Trouble: System Trouble',
LRR_CID_EVENT.TROUBLE_AC_LOSS: 'Trouble: AC Loss',
LRR_CID_EVENT.TROUBLE_LOW_BATTERY: 'Trouble: Low Battery',
LRR_CID_EVENT.TROUBLE_RAM_CHECKSUM_BAD: 'Trouble: RAM Checksum Bad',
LRR_CID_EVENT.TROUBLE_ROM_CHECKSUM_BAD: 'Trouble: ROM Checksum Bad',
LRR_CID_EVENT.TROUBLE_RESET: 'Trouble: System Reset',
LRR_CID_EVENT.TROUBLE_PANEL_PROGRAMMING_CHANGED: 'Trouble: Panel Programming Changed',
LRR_CID_EVENT.TROUBLE_SELF_TEST_FAILURE: 'Trouble: Self-Test Failure',
LRR_CID_EVENT.TROUBLE_SHUTDOWN: 'Trouble: System Shutdown',
LRR_CID_EVENT.TROUBLE_BATTERY_TEST_FAIL: 'Trouble: Battery Test Failure',
LRR_CID_EVENT.TROUBLE_GROUND_FAULT: 'Trouble: Ground Fault',
LRR_CID_EVENT.TROUBLE_BATTERY_MISSING: 'Trouble: Battery Missing',
LRR_CID_EVENT.TROUBLE_POWER_SUPPLY_OVERCURRENT: 'Trouble: Power Supply Overcurrent',
LRR_CID_EVENT.STATUS_ENGINEER_RESET: 'Status: Engineer Reset',
LRR_CID_EVENT.TROUBLE_PRIMARY_POWER_SUPPLY_FAILURE: 'Trouble: Primary Power Supply Failure',
LRR_CID_EVENT.TROUBLE_TAMPER: 'Trouble: System Tamper',
LRR_CID_EVENT.TROUBLE_SOUNDER: 'Trouble: Sounder',
LRR_CID_EVENT.TROUBLE_BELL_1: 'Trouble: Bell 1',
LRR_CID_EVENT.TROUBLE_BELL_2: 'Trouble: Bell 2',
LRR_CID_EVENT.TROUBLE_ALARM_RELAY: 'Trouble: Alarm Relay',
LRR_CID_EVENT.TROUBLE_TROUBLE_RELAY: 'Trouble: Trouble Relay',
LRR_CID_EVENT.TROUBLE_REVERSING_RELAY: 'Trouble: Reversing Relay',
LRR_CID_EVENT.TROUBLE_NOTIFICATION_APPLIANCE_CIRCUIT_3: 'Trouble: Notification Appliance Circuit #3',
LRR_CID_EVENT.TROUBLE_NOTIFICATION_APPLIANCE_CIRCUIT_4: 'Trouble: Notification Appliance Circuit #3',
LRR_CID_EVENT.TROUBLE_SYSTEM_PERIPHERAL: 'Trouble: System Peripheral',
LRR_CID_EVENT.TROUBLE_POLLING_LOOP_OPEN: 'Trouble: Pooling Loop Open',
LRR_CID_EVENT.TROUBLE_POLLING_LOOP_SHORT: 'Trouble: Polling Loop Short',
LRR_CID_EVENT.TROUBLE_EXPANSION_MODULE_FAILURE: 'Trouble: Expansion Module Failure',
LRR_CID_EVENT.TROUBLE_REPEATER_FAILURE: 'Trouble: Repeater Failure',
LRR_CID_EVENT.TROUBLE_LOCAL_PRINTER_PAPER_OUT: 'Trouble: Local Printer Out Of Paper',
LRR_CID_EVENT.TROUBLE_LOCAL_PRINTER_FAILURE: 'Trouble: Local Printer Failure',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_DC_LOSS: 'Trouble: Expander Module, DC Power Loss',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_LOW_BATTERY: 'Trouble: Expander Module, Low Battery',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_RESET: 'Trouble: Expander Module, Reset',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_TAMPER: 'Trouble: Expander Module, Tamper',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_AC_LOSS: 'Trouble: Expander Module, AC Power Loss',
LRR_CID_EVENT.TROUBLE_EXPANDER_MODULE_SELF_TEST_FAIL: 'Trouble: Expander Module, Self-test Failure',
LRR_CID_EVENT.TROUBLE_RF_RECEIVER_JAM_DETECTED: 'Trouble: RF Receiver Jam Detected',
LRR_CID_EVENT.TROUBLE_AES_ENCRYPTION: 'Trouble: AES Encryption',
LRR_CID_EVENT.TROUBLE_COMMUNICATION: 'Trouble: Communication',
LRR_CID_EVENT.TROUBLE_TELCO_1_FAULT: 'Trouble: Telco 1',
LRR_CID_EVENT.TROUBLE_TELCO_2_FAULT: 'Trouble: Telco 2',
LRR_CID_EVENT.TROUBLE_LRR_TRANSMITTER_FAULT: 'Trouble: Long Range Radio Transmitter Fault',
LRR_CID_EVENT.TROUBLE_FAILURE_TO_COMMUNICATE: 'Trouble: Failure To Communicate',
LRR_CID_EVENT.TROUBLE_LOSS_OF_RADIO_SUPERVISION: 'Trouble: Loss of Radio Supervision',
LRR_CID_EVENT.TROUBLE_LOSS_OF_CENTRAL_POLLING: 'Trouble: Loss of Central Polling',
LRR_CID_EVENT.TROUBLE_LRR_TRANSMITTER_VSWR: 'Trouble: Long Range Radio Transmitter/Antenna',
LRR_CID_EVENT.TROUBLE_PERIODIC_COMM_TEST: 'Trouble: Periodic Communication Test',
LRR_CID_EVENT.TROUBLE_PROTECTION_LOOP: 'Trouble: Protection Loop',
LRR_CID_EVENT.TROUBLE_PROTECTION_LOOP_OPEN: 'Trouble: Protection Loop Open',
LRR_CID_EVENT.TROUBLE_PROTECTION_LOOP_SHORT: 'Trouble: Protection Loop Short',
LRR_CID_EVENT.TROUBLE_FIRE: 'Trouble: Fire',
LRR_CID_EVENT.TROUBLE_EXIT_ERROR: 'Trouble: Exit Error',
LRR_CID_EVENT.TROUBLE_PANIC_ZONE_TROUBLE: 'Trouble: Panic',
LRR_CID_EVENT.TROUBLE_HOLDUP_ZONE_TROUBLE: 'Trouble: Hold-up',
LRR_CID_EVENT.TROUBLE_SWINGER_TROUBLE: 'Trouble: Swinger',
LRR_CID_EVENT.TROUBLE_CROSS_ZONE_TROUBLE: 'Trouble: Cross-zone',
LRR_CID_EVENT.TROUBLE_SENSOR_TROUBLE: 'Trouble: Sensor',
LRR_CID_EVENT.TROUBLE_RF_LOSS_OF_SUPERVISION: 'Trouble: RF Loss of Supervision',
LRR_CID_EVENT.TROUBLE_RPM_LOSS_OF_SUPERVISION: 'Trouble: RPM Loss of Supervision',
LRR_CID_EVENT.TROUBLE_SENSOR_TAMPER: 'Trouble: Sensor Tamper',
LRR_CID_EVENT.TROUBLE_RF_LOW_BATTERY: 'Trouble: RF Low Battery',
LRR_CID_EVENT.TROUBLE_SMOKE_HI_SENS: 'Trouble: Smoke Detector, High Sensitivity',
LRR_CID_EVENT.TROUBLE_SMOKE_LO_SENS: 'Trouble: Smoke Detector, Low Sensitivity',
LRR_CID_EVENT.TROUBLE_INTRUSION_HI_SENS: 'Trouble: Intrusion Detector, High Sensitivity',
LRR_CID_EVENT.TROUBLE_INTRUSION_LO_SENS: 'Trouble: Intrusion Detector, Low Sensitivity',
LRR_CID_EVENT.TROUBLE_SELF_TEST_FAIL: 'Trouble: Self-test Failure',
LRR_CID_EVENT.TROUBLE_SENSOR_WATCH_FAIL: 'Trouble: Sensor Watch',
LRR_CID_EVENT.TROUBLE_DRIFT_COMP_ERROR: 'Trouble: Drift Compensation Error',
LRR_CID_EVENT.TROUBLE_MAINTENANCE_ALERT: 'Trouble: Maintenance Alert',
LRR_CID_EVENT.OPENCLOSE: 'Open/Close',
LRR_CID_EVENT.OPENCLOSE_BY_USER: 'Open/Close: By User',
LRR_CID_EVENT.OPENCLOSE_GROUP: 'Open/Close: Group',
LRR_CID_EVENT.OPENCLOSE_AUTOMATIC: 'Open/Close: Automatic',
LRR_CID_EVENT.OPENCLOSE_LATE: 'Open/Close: Late',
LRR_CID_EVENT.OPENCLOSE_DEFERRED: 'Open/Close: Deferred',
LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER: 'Open/Close: Cancel',
LRR_CID_EVENT.OPENCLOSE_REMOTE_ARMDISARM: 'Open/Close: Remote',
LRR_CID_EVENT.OPENCLOSE_QUICK_ARM: 'Open/Close: Quick Arm',
LRR_CID_EVENT.OPENCLOSE_KEYSWITCH: 'Open/Close: Keyswitch',
LRR_CID_EVENT.REMOTE_CALLBACK_REQUESTED: 'Remote: Callback Requested',
LRR_CID_EVENT.REMOTE_SUCCESS: 'Remote: Successful Access',
LRR_CID_EVENT.REMOTE_UNSUCCESSFUL: 'Remote: Unsuccessful Access',
LRR_CID_EVENT.REMOTE_SYSTEM_SHUTDOWN: 'Remote: System Shutdown',
LRR_CID_EVENT.REMOTE_DIALER_SHUTDOWN: 'Remote: Dialer Shutdown',
LRR_CID_EVENT.REMOTE_SUCCESSFUL_UPLOAD: 'Remote: Successful Upload',
LRR_CID_EVENT.ACCESS_DENIED: 'Access: Denied',
LRR_CID_EVENT.ACCESS_REPORT_BY_USER: 'Access: Report By User',
LRR_CID_EVENT.ACCESS_FORCED_ACCESS: 'Access: Forced Access',
LRR_CID_EVENT.ACCESS_EGRESS_DENIED: 'Access: Egress Denied',
LRR_CID_EVENT.ACCESS_EGRESS_GRANTED: 'Access: Egress Granted',
LRR_CID_EVENT.ACCESS_DOOR_PROPPED_OPEN: 'Access: Door Propped Open',
LRR_CID_EVENT.ACCESS_POINT_DSM_TROUBLE: 'Access: Door Status Monitor Trouble',
LRR_CID_EVENT.ACCESS_POINT_RTE_TROUBLE: 'Access: Request To Exit Trouble',
LRR_CID_EVENT.ACCESS_PROGRAM_MODE_ENTRY: 'Access: Program Mode Entry',
LRR_CID_EVENT.ACCESS_PROGRAM_MODE_EXIT: 'Access: Program Mode Exit',
LRR_CID_EVENT.ACCESS_THREAT_LEVEL_CHANGE: 'Access: Threat Level Change',
LRR_CID_EVENT.ACCESS_RELAY_FAIL: 'Access: Relay Fail',
LRR_CID_EVENT.ACCESS_RTE_SHUNT: 'Access: Request to Exit Shunt',
LRR_CID_EVENT.ACCESS_DSM_SHUNT: 'Access: Door Status Monitor Shunt',
LRR_CID_EVENT.ACCESS_SECOND_PERSON: 'Access: Second Person Access',
LRR_CID_EVENT.ACCESS_IRREGULAR_ACCESS: 'Access: Irregular Access',
LRR_CID_EVENT.OPENCLOSE_ARMED_STAY: 'Open/Close: Armed Stay',
LRR_CID_EVENT.OPENCLOSE_KEYSWITCH_ARMED_STAY: 'Open/Close: Keyswitch, Armed Stay',
LRR_CID_EVENT.OPENCLOSE_EXCEPTION: 'Open/Close: Armed with Trouble Override',
LRR_CID_EVENT.OPENCLOSE_EARLY: 'Open/Close: Early',
LRR_CID_EVENT.OPENCLOSE_LATE: 'Open/Close: Late',
LRR_CID_EVENT.TROUBLE_FAILED_TO_OPEN: 'Trouble: Failed To Open',
LRR_CID_EVENT.TROUBLE_FAILED_TO_CLOSE: 'Trouble: Failed To Close',
LRR_CID_EVENT.TROUBLE_AUTO_ARM_FAILED: 'Trouble: Auto Arm Failed',
LRR_CID_EVENT.OPENCLOSE_PARTIAL_ARM: 'Open/Close: Partial Arm',
LRR_CID_EVENT.OPENCLOSE_EXIT_ERROR: 'Open/Close: Exit Error',
LRR_CID_EVENT.OPENCLOSE_USER_ON_PREMISES: 'Open/Close: User On Premises',
LRR_CID_EVENT.TROUBLE_RECENT_CLOSE: 'Trouble: Recent Close',
LRR_CID_EVENT.ACCESS_WRONG_CODE_ENTRY: 'Access: Wrong Code',
LRR_CID_EVENT.ACCESS_LEGAL_CODE_ENTRY: 'Access: Legal Code',
LRR_CID_EVENT.STATUS_REARM_AFTER_ALARM: 'Status: Re-arm After Alarm',
LRR_CID_EVENT.STATUS_AUTO_ARM_TIME_EXTENDED: 'Status: Auto-arm Time Extended',
LRR_CID_EVENT.STATUS_PANIC_ALARM_RESET: 'Status: Panic Alarm Reset',
LRR_CID_EVENT.ACCESS_SERVICE_ONOFF_PREMISES: 'Status: Service On/Off Premises',
LRR_CID_EVENT.OPENCLOSE_PARTIAL_CLOSING: 'Open/Close: Partial Closing',
LRR_CID_EVENT.OPENCLOSE_PARTIAL_CLOSE: 'Open/Close: Partial Close',
LRR_CID_EVENT.DISABLE_ACCESS_READER: 'Disable: Access Reader',
LRR_CID_EVENT.DISABLE_SOUNDER: 'Disable: Sounder',
LRR_CID_EVENT.DISABLE_BELL_1: 'Disable: Bell 1',
LRR_CID_EVENT.DISABLE_BELL_2: 'Disable: Bell 2',
LRR_CID_EVENT.DISABLE_ALARM_RELAY: 'Disable: Alarm Relay',
LRR_CID_EVENT.DISABLE_TROUBLE_RELAY: 'Disable: Trouble Relay',
LRR_CID_EVENT.DISABLE_REVERSING_RELAY: 'Disable: Reversing Relay',
LRR_CID_EVENT.DISABLE_NOTIFICATION_APPLIANCE_CIRCUIT_3: 'Disable: Notification Appliance Circuit #3',
LRR_CID_EVENT.DISABLE_NOTIFICATION_APPLIANCE_CIRCUIT_4: 'Disable: Notification Appliance Circuit #4',
LRR_CID_EVENT.SUPERVISORY_MODULE_ADDED: 'Supervisory: Module Added',
LRR_CID_EVENT.SUPERVISORY_MODULE_REMOVED: 'Supervisory: Module Removed',
LRR_CID_EVENT.DISABLE_DIALER: 'Disable: Dialer',
LRR_CID_EVENT.DISABLE_RADIO_TRANSMITTER: 'Disable: Radio Transmitter',
LRR_CID_EVENT.DISABLE_REMOTE_UPLOADDOWNLOAD: 'Disable: Remote Upload/Download',
LRR_CID_EVENT.BYPASS_ZONE: 'Bypass: Zone',
LRR_CID_EVENT.BYPASS_FIRE: 'Bypass: Fire',
LRR_CID_EVENT.BYPASS_24HOUR_ZONE: 'Bypass: 24 Hour Zone',
LRR_CID_EVENT.BYPASS_BURGLARY: 'Bypass: Burglary',
LRR_CID_EVENT.BYPASS_GROUP: 'Bypass: Group',
LRR_CID_EVENT.BYPASS_SWINGER: 'Bypass: Swinger',
LRR_CID_EVENT.BYPASS_ACCESS_ZONE_SHUNT: 'Bypass: Access Zone Shunt',
LRR_CID_EVENT.BYPASS_ACCESS_POINT_BYPASS: 'Bypass: Access Point',
LRR_CID_EVENT.BYPASS_ZONE_VAULT: 'Bypass: Vault',
LRR_CID_EVENT.BYPASS_ZONE_VENT: 'Bypass: Vent',
LRR_CID_EVENT.TEST_MANUAL: 'Test: Manual Trigger',
LRR_CID_EVENT.TEST_PERIODIC: 'Test: Periodic',
LRR_CID_EVENT.TEST_PERIODIC_RF_TRANSMISSION: 'Test: Periodic RF Transmission',
LRR_CID_EVENT.TEST_FIRE: 'Test: Fire',
LRR_CID_EVENT.TEST_FIRE_STATUS: 'Test: Fire, Status Report To Follow',
LRR_CID_EVENT.TEST_LISTENIN_TO_FOLLOW: 'Test: Listen-in To Follow',
LRR_CID_EVENT.TEST_WALK: 'Test: Walk',
LRR_CID_EVENT.TEST_SYSTEM_TROUBLE_PRESENT: 'Test: Periodic Test, System Trouble Present',
LRR_CID_EVENT.TEST_VIDEO_TRANSMITTER_ACTIVE: 'Test: Video Transmitter Active',
LRR_CID_EVENT.TEST_POINT_TESTED_OK: 'Test: Point Tested OK',
LRR_CID_EVENT.TEST_POINT_NOT_TESTED: 'Test: Point Not Tested',
LRR_CID_EVENT.TEST_INTRUSION_ZONE_WALK_TESTED: 'Test: Intrusion Zone Walk Tested',
LRR_CID_EVENT.TEST_FIRE_ZONE_WALK_TESTED: 'Test: Fire Zone Walk Tested',
LRR_CID_EVENT.TEST_PANIC_ZONE_WALK_TESTED: 'Test: Panic Zone Walk Tested',
LRR_CID_EVENT.TROUBLE_SERVICE_REQUEST: 'Trouble: Service Request',
LRR_CID_EVENT.TROUBLE_EVENT_LOG_RESET: 'Trouble: Event Log Reset',
LRR_CID_EVENT.TROUBLE_EVENT_LOG_50PERCENT_FULL: 'Trouble: Event Log 50% Full',
LRR_CID_EVENT.TROUBLE_EVENT_LOG_90PERCENT_FULL: 'Trouble: Event Log 90% Full',
LRR_CID_EVENT.TROUBLE_EVENT_LOG_OVERFLOW: 'Trouble: Event Log Overflow',
LRR_CID_EVENT.TROUBLE_TIMEDATE_RESET: 'Trouble: Time/Date Reset',
LRR_CID_EVENT.TROUBLE_TIMEDATE_INACCURATE: 'Trouble: Time/Date Inaccurate',
LRR_CID_EVENT.TROUBLE_PROGRAM_MODE_ENTRY: 'Trouble: Program Mode Entry',
LRR_CID_EVENT.TROUBLE_PROGRAM_MODE_EXIT: 'Trouble: Program Mode Exit',
LRR_CID_EVENT.TROUBLE_32HOUR_EVENT_LOG_MARKER: 'Trouble: 32 Hour Event Log Marker',
LRR_CID_EVENT.SCHEDULE_CHANGE: 'Schedule: Change',
LRR_CID_EVENT.SCHEDULE_EXCEPTION_SCHEDULE_CHANGE: 'Schedule: Exception Schedule Change',
LRR_CID_EVENT.SCHEDULE_ACCESS_SCHEDULE_CHANGE: 'Schedule: Access Schedule Change',
LRR_CID_EVENT.TROUBLE_SENIOR_WATCH_TROUBLE: 'Schedule: Senior Watch Trouble',
LRR_CID_EVENT.STATUS_LATCHKEY_SUPERVISION: 'Status: Latch-key Supervision',
LRR_CID_EVENT.SPECIAL_ADT_AUTHORIZATION: 'Special: ADT Authorization',
LRR_CID_EVENT.RESERVED_652: 'Reserved: For Ademco Use',
LRR_CID_EVENT.RESERVED_652: 'Reserved: For Ademco Use',
LRR_CID_EVENT.TROUBLE_SYSTEM_INACTIVITY: 'Trouble: System Inactivity',
LRR_CID_EVENT.TROUBLE_UNABLE_TO_OUTPUT_SIGNAL: 'Trouble: Unable To Output Signal (Derived Channel)',
LRR_CID_EVENT.TROUBLE_STU_CONTROLLER_DOWN: 'Trouble: STU Controller Down (Derived Channel)',
LRR_CID_EVENT.REMOTE_DOWNLOAD_ABORT: 'Remote: Download Aborted',
LRR_CID_EVENT.REMOTE_DOWNLOAD_STARTEND: 'Remote: Download Start/End',
LRR_CID_EVENT.REMOTE_DOWNLOAD_INTERRUPTED: 'Remote: Download Interrupted',
LRR_CID_EVENT.REMOTE_CODE_DOWNLOAD_STARTEND: 'Remote: Device Flash Start/End',
LRR_CID_EVENT.REMOTE_CODE_DOWNLOAD_FAILED: 'Remote: Device Flash Failed',
LRR_CID_EVENT.OPENCLOSE_AUTOCLOSE_WITH_BYPASS: 'Open/Close: Auto-Close With Bypass',
LRR_CID_EVENT.OPENCLOSE_BYPASS_CLOSING: 'Open/Close: Bypass Closing',
LRR_CID_EVENT.EVENT_FIRE_ALARM_SILENCED: 'Event: Fire Alarm Silenced',
LRR_CID_EVENT.EVENT_SUPERVISOR_POINT_STARTEND: 'Event: Supervisory Point Test Start/End',
LRR_CID_EVENT.EVENT_HOLDUP_TEST_STARTEND: 'Event: Hold-up Test Start/End',
LRR_CID_EVENT.EVENT_BURGLARY_TEST_PRINT_STARTEND: 'Event: Burglary Test Print Start/End',
LRR_CID_EVENT.EVENT_SUPERVISORY_TEST_PRINT_STARTEND: 'Event: Supervisory Test Print Start/End',
LRR_CID_EVENT.EVENT_BURGLARY_DIAGNOSTICS_STARTEND: 'Event: Burglary Diagnostics Start/End',
LRR_CID_EVENT.EVENT_FIRE_DIAGNOSTICS_STARTEND: 'Event: Fire Diagnostics Start/End',
LRR_CID_EVENT.EVENT_UNTYPED_DIAGNOSTICS: 'Event: Untyped Diagnostics',
LRR_CID_EVENT.EVENT_TROUBLE_CLOSING: 'Event: Trouble Closing',
LRR_CID_EVENT.EVENT_ACCESS_DENIED_CODE_UNKNOWN: 'Event: Access Denied, Code Unknown',
LRR_CID_EVENT.ALARM_SUPERVISORY_POINT: 'Alarm: Supervisory Point',
LRR_CID_EVENT.EVENT_SUPERVISORY_POINT_BYPASS: 'Event: Supervisory Point Bypass',
LRR_CID_EVENT.TROUBLE_SUPERVISORY_POINT: 'Trouble: Supervisory Point',
LRR_CID_EVENT.EVENT_HOLDUP_POINT_BYPASS: 'Event: Hold-up Point Bypass',
LRR_CID_EVENT.EVENT_AC_FAILURE_FOR_4HOURS: 'Event: AC Failure For 4 Hours',
LRR_CID_EVENT.TROUBLE_OUTPUT: 'Trouble: Output Trouble',
LRR_CID_EVENT.EVENT_USER_CODE_FOR_EVENT: 'Event: User Code For Event',
LRR_CID_EVENT.EVENT_LOG_OFF: 'Event: Log-off',
LRR_CID_EVENT.EVENT_CS_CONNECTION_FAILURE: 'Event: Central Station Connection Failure',
LRR_CID_EVENT.EVENT_RECEIVER_DATABASE_CONNECTION: 'Event: Receiver Database Connection',
LRR_CID_EVENT.EVENT_LICENSE_EXPIRATION: 'Event: License Expiration',
LRR_CID_EVENT.OTHER_NO_READ_LOG: 'Other: No Read Log',
}

# Map of DSC event codes to human-readable text.
LRR_DSC_MAP = {
LRR_DSC_EVENT.ZONE_EXPANDER_SUPERVISORY_ALARM: 'Zone Expander Supervisory Alarm',
LRR_DSC_EVENT.ZONE_EXPANDER_SUPERVISORY_RESTORE: 'Zone Expander Supervisory Restore',
LRR_DSC_EVENT.AUX_INPUT_ALARM: 'Auxillary Input Alarm',
LRR_DSC_EVENT.SPECIAL_CLOSING: 'Special Closing',
LRR_DSC_EVENT.CROSS_ZONE_POLICE_CODE_ALARM: 'Cross-zone Police Code Alarm',
LRR_DSC_EVENT.AUTOMATIC_CLOSING: 'Automatic Closing',
LRR_DSC_EVENT.ZONE_BYPASS: 'Zone Bypass',
LRR_DSC_EVENT.REPORT_DSC_USER_LOG_EVENT: 'Report DSC User Log Event',
}

# Map of ADEMCO event codes to human-readable text.
LRR_ADEMCO_MAP = {

}

LRR_ALARMDECODER_MAP = {
LRR_ALARMDECODER_EVENT.CUSTOM_PROG_MSG: 'Custom Programming Message',
LRR_ALARMDECODER_EVENT.CUSTOM_PROG_KEY: 'Custom Programming Key'
}

# Map of UNKNOWN event codes to human-readable text.
LRR_UNKNOWN_MAP = {

}

# Map of event type codes to text maps.
LRR_TYPE_MAP = {
LRR_EVENT_TYPE.CID: LRR_CID_MAP,
LRR_EVENT_TYPE.DSC: LRR_DSC_MAP,
LRR_EVENT_TYPE.ADEMCO: LRR_ADEMCO_MAP,
LRR_EVENT_TYPE.ALARMDECODER: LRR_ALARMDECODER_MAP,
LRR_EVENT_TYPE.UNKNOWN: LRR_UNKNOWN_MAP,
}

# LRR events that should be considered Fire events.
LRR_FIRE_EVENTS = [
LRR_CID_EVENT.FIRE,
LRR_CID_EVENT.FIRE_SMOKE,
LRR_CID_EVENT.FIRE_COMBUSTION,
LRR_CID_EVENT.FIRE_WATER_FLOW,
LRR_CID_EVENT.FIRE_HEAT,
LRR_CID_EVENT.FIRE_PULL_STATION,
LRR_CID_EVENT.FIRE_DUCT,
LRR_CID_EVENT.FIRE_FLAME,
LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER # HACK: Don't really like having this here
]

# LRR events that should be considered Alarm events.
LRR_ALARM_EVENTS = [
LRR_CID_EVENT.BURGLARY,
LRR_CID_EVENT.BURGLARY_PERIMETER,
LRR_CID_EVENT.BURGLARY_INTERIOR,
LRR_CID_EVENT.BURGLARY_AUX,
LRR_CID_EVENT.BURGLARY_ENTRYEXIT,
LRR_CID_EVENT.BURGLARY_DAYNIGHT,
LRR_CID_EVENT.BURGLARY_OUTDOOR,
LRR_CID_EVENT.ALARM_GENERAL,
LRR_CID_EVENT.BURGLARY_SILENT,
LRR_CID_EVENT.ALARM_AUX,
LRR_CID_EVENT.ALARM_GAS_DETECTED,
LRR_CID_EVENT.ALARM_REFRIDGERATION,
LRR_CID_EVENT.ALARM_LOSS_OF_HEAT,
LRR_CID_EVENT.ALARM_WATER_LEAKAGE,
LRR_CID_EVENT.ALARM_LOW_BOTTLED_GAS_LEVEL,
LRR_CID_EVENT.ALARM_HIGH_TEMP,
LRR_CID_EVENT.ALARM_LOW_TEMP,
LRR_CID_EVENT.ALARM_LOSS_OF_AIR_FLOW,
LRR_CID_EVENT.ALARM_CARBON_MONOXIDE,
LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER # HACK: Don't really like having this here
]

# LRR events that should be considered Power events.
LRR_POWER_EVENTS = [
LRR_CID_EVENT.TROUBLE_AC_LOSS
]

# LRR events that should be considered Bypass events.
LRR_BYPASS_EVENTS = [
LRR_CID_EVENT.BYPASS_ZONE,
LRR_CID_EVENT.BYPASS_24HOUR_ZONE,
LRR_CID_EVENT.BYPASS_BURGLARY
]

# LRR events that should be considered Battery events.
LRR_BATTERY_EVENTS = [
LRR_CID_EVENT.TROUBLE_LOW_BATTERY
]

# LRR events that should be considered Panic events.
LRR_PANIC_EVENTS = [
LRR_CID_EVENT.MEDICAL,
LRR_CID_EVENT.MEDICAL_PENDANT,
LRR_CID_EVENT.MEDICAL_FAIL_TO_REPORT,
LRR_CID_EVENT.PANIC,
LRR_CID_EVENT.PANIC_DURESS,
LRR_CID_EVENT.PANIC_SILENT,
LRR_CID_EVENT.PANIC_AUDIBLE,
LRR_CID_EVENT.PANIC_DURESS_ACCESS_GRANTED,
LRR_CID_EVENT.PANIC_DURESS_EGRESS_GRANTED,
LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER # HACK: Don't really like having this here
]

# LRR events that should be considered Arm events.
LRR_ARM_EVENTS = [
LRR_CID_EVENT.OPENCLOSE,
LRR_CID_EVENT.OPENCLOSE_BY_USER,
LRR_CID_EVENT.OPENCLOSE_GROUP,
LRR_CID_EVENT.OPENCLOSE_AUTOMATIC,
LRR_CID_EVENT.OPENCLOSE_REMOTE_ARMDISARM,
LRR_CID_EVENT.OPENCLOSE_QUICK_ARM,
LRR_CID_EVENT.OPENCLOSE_KEYSWITCH,
LRR_CID_EVENT.OPENCLOSE_ARMED_STAY, # HACK: Not sure if I like having these in here.
LRR_CID_EVENT.OPENCLOSE_KEYSWITCH_ARMED_STAY
]

# LRR events that should be considered Arm Stay events.
LRR_STAY_EVENTS = [
LRR_CID_EVENT.OPENCLOSE_ARMED_STAY,
LRR_CID_EVENT.OPENCLOSE_KEYSWITCH_ARMED_STAY
]

+ 113
- 0
alarmdecoder/messages/lrr/message.py View File

@@ -0,0 +1,113 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

:py:class:`LRRMessage`: Message received from a long-range radio module.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

from .. import BaseMessage
from ...util import InvalidMessageError

from .events import LRR_EVENT_TYPE, get_event_description, get_event_source


class LRRMessage(BaseMessage):
"""
Represent a message from a Long Range Radio or emulated Long Range Radio.
"""

event_data = None
"""Data associated with the LRR message. Usually user ID or zone."""
partition = -1
"""The partition that this message applies to."""
event_type = None
"""The type of the event that occurred."""
version = 0
"""LRR message version"""

report_code = 0xFF
"""The report code used to override the last two digits of the event type."""
event_prefix = ''
"""Extracted prefix for the event_type."""
event_source = LRR_EVENT_TYPE.UNKNOWN
"""Extracted event type source."""
event_status = 0
"""Event status flag that represents triggered or restored events."""
event_code = 0
"""Event code for the LRR message."""
event_description = ''
"""Human-readable description of LRR event."""

def __init__(self, data=None, skip_report_override=False):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self, data)

self.skip_report_override = skip_report_override

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parses the raw message from the device.

:param data: message data to parse
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
_, values = data.split(':')
values = values.split(',')

# Handle older-format events
if len(values) <= 3:
self.event_data, self.partition, self.event_type = values
self.version = 1

# Newer-format events
else:
self.event_data, self.partition, self.event_type, self.report_code = values
self.version = 2

event_type_data = self.event_type.split('_')
self.event_prefix = event_type_data[0] # Ex: CID
self.event_source = get_event_source(self.event_prefix) # Ex: LRR_EVENT_TYPE.CID
self.event_status = int(event_type_data[1][0]) # Ex: 1 or 3
self.event_code = int(event_type_data[1][1:], 16) # Ex: 0x100 = Medical

# replace last 2 digits of event_code with report_code, if applicable.
if not self.skip_report_override and self.report_code not in ['00', 'ff']:
self.event_code = int(event_type_data[1][1] + self.report_code, 16)
self.event_description = get_event_description(self.event_source, self.event_code)

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))


def dict(self, **kwargs):
"""
Dictionary representation
"""
return dict(
time = self.timestamp,
event_data = self.event_data,
event_type = self.event_type,
partition = self.partition,
report_code = self.report_code,
event_prefix = self.event_prefix,
event_source = self.event_source,
event_status = self.event_status,
event_code = hex(self.event_code),
event_description = self.event_description,
**kwargs
)

+ 164
- 0
alarmdecoder/messages/lrr/system.py View File

@@ -0,0 +1,164 @@
"""
Primary system for handling LRR events.

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

from .events import LRR_EVENT_TYPE, LRR_EVENT_STATUS, LRR_CID_EVENT
from .events import LRR_FIRE_EVENTS, LRR_POWER_EVENTS, LRR_BYPASS_EVENTS, LRR_BATTERY_EVENTS, \
LRR_PANIC_EVENTS, LRR_ARM_EVENTS, LRR_STAY_EVENTS, LRR_ALARM_EVENTS


class LRRSystem(object):
"""
Handles LRR events and triggers higher-level events in the AlarmDecoder object.
"""

def __init__(self, alarmdecoder_object):
"""
Constructor

:param alarmdecoder_object: Main AlarmDecoder object
:type alarmdecoder_object: :py:class:`~alarmdecoder.AlarmDecoder`
"""
self._alarmdecoder = alarmdecoder_object

def update(self, message):
"""
Updates the states in the primary AlarmDecoder object based on
the LRR message provided.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
# Firmware version < 2.2a.8.6
if message.version == 1:
if message.event_type == 'ALARM_PANIC':
self._alarmdecoder._update_panic_status(True)
elif message.event_type == 'CANCEL':
self._alarmdecoder._update_panic_status(False)

# Firmware version >= 2.2a.8.6
elif message.version == 2:
source = message.event_source
if source == LRR_EVENT_TYPE.CID:
self._handle_cid_message(message)
elif source == LRR_EVENT_TYPE.DSC:
self._handle_dsc_message(message)
elif source == LRR_EVENT_TYPE.ADEMCO:
self._handle_ademco_message(message)
elif source == LRR_EVENT_TYPE.ALARMDECODER:
self._handle_alarmdecoder_message(message)
elif source == LRR_EVENT_TYPE.UNKNOWN:
self._handle_unknown_message(message)
else:
pass

def _handle_cid_message(self, message):
"""
Handles ContactID LRR events.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
status = self._get_event_status(message)
if status is None:
return

if message.event_code in LRR_FIRE_EVENTS:
if message.event_code == LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER:
status = False

self._alarmdecoder._update_fire_status(status=status)
if message.event_code in LRR_ALARM_EVENTS:
kwargs = {}
field_name = 'zone'
if not status:
field_name = 'user'

kwargs[field_name] = int(message.event_data)
self._alarmdecoder._update_alarm_status(status=status, **kwargs)

if message.event_code in LRR_POWER_EVENTS:
self._alarmdecoder._update_power_status(status=status)

if message.event_code in LRR_BYPASS_EVENTS:
self._alarmdecoder._update_zone_bypass_status(status=status, zone=int(message.event_data))

if message.event_code in LRR_BATTERY_EVENTS:
self._alarmdecoder._update_battery_status(status=status)

if message.event_code in LRR_PANIC_EVENTS:
if message.event_code == LRR_CID_EVENT.OPENCLOSE_CANCEL_BY_USER:
status = False

self._alarmdecoder._update_panic_status(status=status)

if message.event_code in LRR_ARM_EVENTS:
# NOTE: status on OPENCLOSE messages is backwards.
status_stay = (message.event_status == LRR_EVENT_STATUS.RESTORE \
and message.event_code in LRR_STAY_EVENTS)

if status_stay:
status = False
else:
status = not status

self._alarmdecoder._update_armed_status(status=status, status_stay=status_stay)

def _handle_dsc_message(self, message):
"""
Handles DSC LRR events.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
pass

def _handle_ademco_message(self, message):
"""
Handles ADEMCO LRR events.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
pass

def _handle_alarmdecoder_message(self, message):
"""
Handles AlarmDecoder LRR events.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
pass

def _handle_unknown_message(self, message):
"""
Handles UNKNOWN LRR events.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`
"""
# TODO: Log this somewhere useful.
pass

def _get_event_status(self, message):
"""
Retrieves the boolean status of an LRR message.

:param message: LRR message object
:type message: :py:class:`~alarmdecoder.messages.LRRMessage`

:returns: Boolean indicating whether the event was triggered or restored.
"""
status = None

if message.event_status == LRR_EVENT_STATUS.TRIGGER:
status = True
elif message.event_status == LRR_EVENT_STATUS.RESTORE:
status = False

return status

+ 190
- 0
alarmdecoder/messages/panel_message.py View File

@@ -0,0 +1,190 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

:py:class:`Message`: The standard and most common message received from a panel.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import re

from . import BaseMessage
from ..util import InvalidMessageError
from ..panels import PANEL_TYPES, ADEMCO, DSC

class Message(BaseMessage):
"""
Represents a message from the alarm panel.
"""

ready = False
"""Indicates whether or not the panel is in a ready state."""
armed_away = False
"""Indicates whether or not the panel is armed away."""
armed_home = False
"""Indicates whether or not the panel is armed home."""
backlight_on = False
"""Indicates whether or not the keypad backlight is on."""
programming_mode = False
"""Indicates whether or not we're in programming mode."""
beeps = -1
"""Number of beeps associated with a message."""
zone_bypassed = False
"""Indicates whether or not a zone is bypassed."""
ac_power = False
"""Indicates whether or not the panel is on AC power."""
chime_on = False
"""Indicates whether or not the chime is enabled."""
alarm_event_occurred = False
"""Indicates whether or not an alarm event has occurred."""
alarm_sounding = False
"""Indicates whether or not an alarm is sounding."""
battery_low = False
"""Indicates whether or not there is a low battery."""
entry_delay_off = False
"""Indicates whether or not the entry delay is enabled."""
fire_alarm = False
"""Indicates whether or not a fire alarm is sounding."""
check_zone = False
"""Indicates whether or not there are zones that require attention."""
perimeter_only = False
"""Indicates whether or not the perimeter is armed."""
system_fault = False
"""Indicates whether a system fault has occurred."""
panel_type = ADEMCO
"""Indicates which panel type was the source of this message."""
numeric_code = None
"""The numeric code associated with the message."""
text = None
"""The human-readable text to be displayed on the panel LCD."""
cursor_location = -1
"""Current cursor location on the keypad."""
mask = 0xFFFFFFFF
"""Address mask this message is intended for."""
bitfield = None
"""The bitfield associated with this message."""
panel_data = None
"""The panel data field associated with this message."""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self, data)

self._regex = re.compile('^(!KPM:){0,1}(\[[a-fA-F0-9\-]+\]),([a-fA-F0-9]+),(\[[a-fA-F0-9]+\]),(".+")$')

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parse the message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
match = self._regex.match(str(data))

if match is None:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

header, self.bitfield, self.numeric_code, self.panel_data, alpha = match.group(1, 2, 3, 4, 5)

is_bit_set = lambda bit: not self.bitfield[bit] == "0"

self.ready = is_bit_set(1)
self.armed_away = is_bit_set(2)
self.armed_home = is_bit_set(3)
self.backlight_on = is_bit_set(4)
self.programming_mode = is_bit_set(5)
self.beeps = int(self.bitfield[6], 16)
self.zone_bypassed = is_bit_set(7)
self.ac_power = is_bit_set(8)
self.chime_on = is_bit_set(9)
self.alarm_event_occurred = is_bit_set(10)
self.alarm_sounding = is_bit_set(11)
self.battery_low = is_bit_set(12)
self.entry_delay_off = is_bit_set(13)
self.fire_alarm = is_bit_set(14)
self.check_zone = is_bit_set(15)
self.perimeter_only = is_bit_set(16)
self.system_fault = is_bit_set(17)
if self.bitfield[18] in list(PANEL_TYPES):
self.panel_type = PANEL_TYPES[self.bitfield[18]]
# pos 20-21 - Unused.
self.text = alpha.strip('"')
self.mask = int(self.panel_data[3:3+8], 16)

if self.panel_type in (ADEMCO, DSC):
if int(self.panel_data[19:21], 16) & 0x01 > 0:
# Current cursor location on the alpha display.
self.cursor_location = int(self.panel_data[21:23], 16)

def parse_numeric_code(self, force_hex=False):
"""
Parses and returns the numeric code as an integer.

The numeric code can be either base 10 or base 16, depending on
where the message came from.

:param force_hex: force the numeric code to be processed as base 16.
:type force_hex: boolean

:raises: ValueError
"""
code = None
got_error = False

if not force_hex:
try:
code = int(self.numeric_code)
except ValueError:
got_error = True

if force_hex or got_error:
try:
code = int(self.numeric_code, 16)
except ValueError:
raise

return code

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
bitfield = self.bitfield,
numeric_code = self.numeric_code,
panel_data = self.panel_data,
mask = self.mask,
ready = self.ready,
armed_away = self.armed_away,
armed_home = self.armed_home,
backlight_on = self.backlight_on,
programming_mode = self.programming_mode,
beeps = self.beeps,
zone_bypassed = self.zone_bypassed,
ac_power = self.ac_power,
chime_on = self.chime_on,
alarm_event_occurred = self.alarm_event_occurred,
alarm_sounding = self.alarm_sounding,
battery_low = self.battery_low,
entry_delay_off = self.entry_delay_off,
fire_alarm = self.fire_alarm,
check_zone = self.check_zone,
perimeter_only = self.perimeter_only,
text = self.text,
cursor_location = self.cursor_location,
**kwargs
)

+ 82
- 0
alarmdecoder/messages/rf_message.py View File

@@ -0,0 +1,82 @@
"""
Message representations received from the panel through the `AlarmDecoder`_ (AD2)
devices.

:py:class:`RFMessage`: Message received from an RF receiver module.

.. _AlarmDecoder: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

from . import BaseMessage
from ..util import InvalidMessageError

class RFMessage(BaseMessage):
"""
Represents a message from an RF receiver.
"""

serial_number = None
"""Serial number of the RF device."""
value = -1
"""Value associated with this message."""
battery = False
"""Low battery indication"""
supervision = False
"""Supervision required indication"""
loop = [False for _ in list(range(4))]
"""Loop indicators"""

def __init__(self, data=None):
"""
Constructor

:param data: message data to parse
:type data: string
"""
BaseMessage.__init__(self, data)

if data is not None:
self._parse_message(data)

def _parse_message(self, data):
"""
Parses the raw message from the device.

:param data: message data
:type data: string

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
try:
_, values = data.split(':')
self.serial_number, self.value = values.split(',')
self.value = int(self.value, 16)

is_bit_set = lambda b: self.value & (1 << (b - 1)) > 0

# Bit 1 = unknown
self.battery = is_bit_set(2)
self.supervision = is_bit_set(3)
# Bit 4 = unknown
self.loop[2] = is_bit_set(5)
self.loop[1] = is_bit_set(6)
self.loop[3] = is_bit_set(7)
self.loop[0] = is_bit_set(8)

except ValueError:
raise InvalidMessageError('Received invalid message: {0}'.format(data))

def dict(self, **kwargs):
"""
Dictionary representation.
"""
return dict(
time = self.timestamp,
serial_number = self.serial_number,
value = self.value,
battery = self.battery,
supervision = self.supervision,
**kwargs
)

+ 7
- 0
alarmdecoder/states.py View File

@@ -0,0 +1,7 @@
class FireState:
"""
Fire alarm status
"""
NONE = 0
ALARM = 1
ACKNOWLEDGED = 2

+ 39
- 0
alarmdecoder/util.py View File

@@ -9,6 +9,7 @@ Provides utility classes for the `AlarmDecoder`_ (AD2) devices.
import time
import threading
import select
import sys
import alarmdecoder

from io import open
@@ -58,6 +59,15 @@ class UploadChecksumError(UploadError):


def bytes_available(device):
"""
Determines the number of bytes available for reading from an
AlarmDecoder device

:param device: the AlarmDecoder device
:type device: :py:class:`~alarmdecoder.devices.Device`

:returns: int
"""
bytes_avail = 0

if isinstance(device, alarmdecoder.devices.SerialDevice):
@@ -70,7 +80,28 @@ def bytes_available(device):

return bytes_avail

def bytes_hack(buf):
"""
Hacky workaround for old installs of the library on systems without python-future that were
keeping the 2to3 update from working after auto-update.
"""
ub = None
if sys.version_info > (3,):
ub = buf
else:
ub = bytes(buf)

return ub

def read_firmware_file(file_path):
"""
Reads a firmware file into a dequeue for processing.

:param file_path: Path to the firmware file
:type file_path: string

:returns: deque
"""
data_queue = deque()

with open(file_path) as firmware_handle:
@@ -99,6 +130,14 @@ class Firmware(object):

@staticmethod
def read(device):
"""
Reads data from the specified device.

:param device: the AlarmDecoder device
:type device: :py:class:`~alarmdecoder.devices.Device`

:returns: string
"""
response = None
bytes_avail = bytes_available(device)



+ 2
- 8
alarmdecoder/zonetracking.py View File

@@ -177,14 +177,8 @@ class Zonetracker(object):
self._last_zone_fault = 0

# Process fault
elif message.check_zone or message.text.startswith("FAULT") or message.text.startswith("ALARM"):
# Apparently this representation can be both base 10
# or base 16, depending on where the message came
# from.
try:
zone = int(message.numeric_code)
except ValueError:
zone = int(message.numeric_code, 16)
elif self.alarmdecoder_object.mode != DSC and (message.check_zone or message.text.startswith("FAULT") or message.text.startswith("ALARM")):
zone = message.parse_numeric_code()

# NOTE: Odd case for ECP failures. Apparently they report as
# zone 191 (0xBF) regardless of whether or not the


+ 69
- 41
test/test_ad2.py View File

@@ -10,6 +10,9 @@ from alarmdecoder.devices import USBDevice
from alarmdecoder.messages import Message, RFMessage, LRRMessage, ExpanderMessage
from alarmdecoder.event.event import Event, EventHandler
from alarmdecoder.zonetracking import Zonetracker
from alarmdecoder.panels import ADEMCO, DSC
from alarmdecoder.messages.lrr import LRR_EVENT_TYPE, LRR_EVENT_STATUS
from alarmdecoder.states import FireState


class TestAlarmDecoder(TestCase):
@@ -66,6 +69,7 @@ class TestAlarmDecoder(TestCase):
def tearDown(self):
pass

### Library events
def on_panic(self, sender, *args, **kwargs):
self._panicked = kwargs['status']

@@ -123,6 +127,7 @@ class TestAlarmDecoder(TestCase):
def on_zone_restore(self, sender, *args, **kwargs):
self._zone_restored = kwargs['zone']

### Tests
def test_open(self):
self._decoder.open()
self._device.open.assert_any_calls()
@@ -183,108 +188,132 @@ class TestAlarmDecoder(TestCase):
self.assertTrue(self._expander_message_received)

def test_relay_message(self):
self._decoder.open()
msg = self._decoder._handle_message(b'!REL:12,01,01')
self.assertIsInstance(msg, ExpanderMessage)
self.assertEqual(self._relay_changed, True)
self.assertTrue(self._relay_changed)

def test_rfx_message(self):
msg = self._decoder._handle_message(b'!RFX:0180036,80')
self.assertIsInstance(msg, RFMessage)
self.assertTrue(self._rfx_message_received)

def test_panic(self):
self._decoder.open()

def test_panic_v1(self):
# LRR v1
msg = self._decoder._handle_message(b'!LRR:012,1,ALARM_PANIC')
self.assertEquals(self._panicked, True)
self.assertIsInstance(msg, LRRMessage)
self.assertTrue(self._panicked)

msg = self._decoder._handle_message(b'!LRR:012,1,CANCEL')
self.assertEquals(self._panicked, False)
self.assertIsInstance(msg, LRRMessage)
self.assertFalse(self._panicked)

def test_config_message(self):
self._decoder.open()
def test_panic_v2(self):
# LRR v2
msg = self._decoder._handle_message(b'!LRR:099,1,CID_1123,ff') # Panic
self.assertIsInstance(msg, LRRMessage)
self.assertTrue(self._panicked)

msg = self._decoder._handle_message(b'!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
msg = self._decoder._handle_message(b'!LRR:001,1,CID_1406,ff') # Cancel
self.assertIsInstance(msg, LRRMessage)
self.assertFalse(self._panicked)

def test_config_message(self):
msg = self._decoder._handle_message(b'!CONFIG>MODE=A&CONFIGBITS=ff04&ADDRESS=18&LRR=N&COM=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEquals(self._decoder.mode, ADEMCO)
self.assertEquals(self._decoder.address, 18)
self.assertEquals(self._decoder.configbits, int('ff00', 16))
self.assertEquals(self._decoder.configbits, int('ff04', 16))
self.assertEquals(self._decoder.address_mask, int('ffffffff', 16))
self.assertEquals(self._decoder.emulate_zone, [False for x in range(5)])
self.assertEquals(self._decoder.emulate_relay, [False for x in range(4)])
self.assertEquals(self._decoder.emulate_lrr, False)
self.assertEquals(self._decoder.deduplicate, False)
self.assertEqual(self._got_config, True)
self.assertFalse(self._decoder.emulate_lrr)
self.assertFalse(self._decoder.emulate_com)
self.assertFalse(self._decoder.deduplicate)
self.assertTrue(self._got_config)

def test_power_changed_event(self):
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False) # Not set first time we hit it.
self.assertFalse(self._power_changed) # Not set first time we hit it.

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False)
self.assertFalse(self._power_changed)

msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, True)
self.assertTrue(self._power_changed)

def test_alarm_event(self):
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False) # Not set first time we hit it.
self.assertFalse(self._alarmed) # Not set first time we hit it.

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False)
self.assertEquals(self._alarm_restored, True)
self.assertFalse(self._alarmed)
self.assertTrue(self._alarm_restored)

msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, True)
self.assertTrue(self._alarmed)

def test_zone_bypassed_event(self):
msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False) # Not set first time we hit it.

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False)
self.assertFalse(self._bypassed)

msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, True)
self.assertTrue(self._bypassed)

def test_armed_away_event(self):
msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.
self.assertFalse(self._armed) # Not set first time we hit it.

msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertFalse(self._armed)

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)
self.assertFalse(self._armed)

msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)
self.assertTrue(self._armed)

self._armed = False

msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.
self.assertTrue(self._armed)

msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)
self.assertFalse(self._armed)

def test_battery_low_event(self):
msg = self._decoder._handle_message(b'[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, True)
self.assertTrue(self._battery)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, False)
self.assertFalse(self._battery)

def test_fire_alarm_event(self):
self._fire = FireState.NONE

msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, True)
self.assertEquals(self._fire, FireState.ALARM)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
with patch.object(time, 'time', return_value=self._decoder._fire_status[1] + 35):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, FireState.NONE)

def test_fire_lrr(self):
self._fire = FireState.NONE

msg = self._decoder._handle_message(b'!LRR:095,1,CID_1110,ff') # Fire: Non-specific

self.assertIsInstance(msg, LRRMessage)
self.assertEquals(self._fire, FireState.ALARM)

msg = self._decoder._handle_message(b'!LRR:001,1,CID_1406,ff') # Open/Close: Cancel
self.assertIsInstance(msg, LRRMessage)
self.assertEquals(self._fire, FireState.ACKNOWLEDGED)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._fire_status[1] + 35):
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, False)
self.assertEquals(self._fire, FireState.NONE)

def test_hit_for_faults(self):
self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
@@ -314,4 +343,3 @@ class TestAlarmDecoder(TestCase):

self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_restored, 3)


+ 14
- 8
test/test_devices.py View File

@@ -48,12 +48,14 @@ class TestUSBDevice(TestCase):
def tearDown(self):
self._device.close()

### Library events
def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

### Tests
def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()
@@ -69,8 +71,8 @@ class TestUSBDevice(TestCase):
self.assertEqual(device.interface, 'AD2-2')

def test_events(self):
self.assertEqual(self._attached, False)
self.assertEqual(self._detached, False)
self.assertFalse(self._attached)
self.assertFalse(self._detached)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
@@ -81,8 +83,8 @@ class TestUSBDevice(TestCase):
time.sleep(1)
USBDevice.stop_detection()

self.assertEqual(self._attached, True)
self.assertEqual(self._detached, True)
self.assertTrue(self._attached)
self.assertTrue(self._detached)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
@@ -149,6 +151,7 @@ class TestSerialDevice(TestCase):
def tearDown(self):
self._device.close()

### Tests
def test_open(self):
self._device.interface = '/dev/ttyS0'

@@ -249,6 +252,7 @@ class TestSocketDevice(TestCase):
def tearDown(self):
self._device.close()

### Tests
def test_open(self):
with patch.object(socket.socket, '__init__', return_value=None):
with patch.object(socket.socket, 'connect', return_value=None) as mock:
@@ -411,12 +415,14 @@ if have_pyftdi:
def tearDown(self):
self._device.close()

### Library events
def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

### Tests
def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()
@@ -432,8 +438,8 @@ if have_pyftdi:
self.assertEquals(device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)
self.assertFalse(self._attached)
self.assertFalse(self._detached)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
@@ -444,8 +450,8 @@ if have_pyftdi:
time.sleep(1)
USBDevice.stop_detection()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)
self.assertTrue(self._attached)
self.assertTrue(self._detached)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:


+ 47
- 3
test/test_messages.py View File

@@ -1,7 +1,9 @@
from unittest import TestCase

from alarmdecoder.messages import Message, ExpanderMessage, RFMessage, LRRMessage
from alarmdecoder.messages.lrr import LRR_EVENT_TYPE, LRR_CID_EVENT, LRR_EVENT_STATUS
from alarmdecoder.util import InvalidMessageError
from alarmdecoder.panels import ADEMCO


class TestMessages(TestCase):
@@ -11,10 +13,32 @@ class TestMessages(TestCase):
def tearDown(self):
pass

### Tests
def test_message_parse(self):
msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "')

msg = Message('[00000000000000000A--],001,[f707000600e5800c0c020000],"FAULT 1 "')

self.assertFalse(msg.ready)
self.assertFalse(msg.armed_away)
self.assertFalse(msg.armed_home)
self.assertFalse(msg.backlight_on)
self.assertFalse(msg.programming_mode)
self.assertEqual(msg.beeps, 0)
self.assertFalse(msg.zone_bypassed)
self.assertFalse(msg.ac_power)
self.assertFalse(msg.chime_on)
self.assertFalse(msg.alarm_event_occurred)
self.assertFalse(msg.alarm_sounding)
self.assertFalse(msg.battery_low)
self.assertFalse(msg.entry_delay_off)
self.assertFalse(msg.fire_alarm)
self.assertFalse(msg.check_zone)
self.assertFalse(msg.perimeter_only)
self.assertFalse(msg.system_fault)
self.assertFalse(msg.panel_type, ADEMCO)
self.assertEqual(msg.numeric_code, '001')
self.assertEqual(msg.mask, int('07000600', 16))
self.assertEqual(msg.cursor_location, -1)
self.assertEqual(msg.text, 'FAULT 1 ')

def test_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
@@ -24,6 +48,8 @@ class TestMessages(TestCase):
msg = ExpanderMessage('!EXP:07,01,01')

self.assertEqual(msg.address, 7)
self.assertEqual(msg.channel, 1)
self.assertEqual(msg.value, 1)

def test_expander_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
@@ -33,16 +59,34 @@ class TestMessages(TestCase):
msg = RFMessage('!RFX:0180036,80')

self.assertEqual(msg.serial_number, '0180036')
self.assertEqual(msg.value, int('80', 16))

def test_rf_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = RFMessage('')

def test_lrr_message_parse(self):
def test_lrr_message_parse_v1(self):
msg = LRRMessage('!LRR:012,1,ARM_STAY')

self.assertEqual(msg.event_data, '012')
self.assertEqual(msg.partition, '1')
self.assertEqual(msg.event_type, 'ARM_STAY')

def test_lrr_message_parse_v2(self):
msg = LRRMessage(b'!LRR:001,1,CID_3401,ff')
self.assertIsInstance(msg, LRRMessage)
self.assertEquals(msg.event_data, '001')
self.assertEquals(msg.partition, '1')
self.assertEquals(msg.event_prefix, 'CID')
self.assertEquals(msg.event_source, LRR_EVENT_TYPE.CID)
self.assertEquals(msg.event_status, LRR_EVENT_STATUS.RESTORE)
self.assertEquals(msg.event_code, LRR_CID_EVENT.OPENCLOSE_BY_USER)
self.assertEquals(msg.report_code, 'ff')

def test_lrr_event_code_override(self):
msg = LRRMessage(b'!LRR:001,1,CID_3400,01')
self.assertEquals(msg.event_code, LRR_CID_EVENT.OPENCLOSE_BY_USER) # 400 -> 401

def test_lrr_message_parse_fail(self):
with self.assertRaises(InvalidMessageError):
msg = LRRMessage('')

+ 3
- 0
test/test_zonetracking.py View File

@@ -23,18 +23,21 @@ class TestZonetracking(TestCase):
def tearDown(self):
pass

### Library events
def fault_event(self, sender, *args, **kwargs):
self._faulted = True

def restore_event(self, sender, *args, **kwargs):
self._restored = True

### Util
def _build_expander_message(self, msg):
msg = ExpanderMessage(msg)
zone = self._zonetracker.expander_to_zone(msg.address, msg.channel)

return zone, msg

### Tests
def test_zone_fault(self):
zone, msg = self._build_expander_message('!EXP:07,01,01')
self._zonetracker.update(msg)


Loading…
Cancel
Save