Browse Source

2to3 conversions

pyserial_fix
Kevin Gottsman 8 years ago
parent
commit
6e9efed50f
17 changed files with 127 additions and 127 deletions
  1. +6
    -6
      alarmdecoder/decoder.py
  2. +20
    -20
      alarmdecoder/devices.py
  3. +1
    -1
      alarmdecoder/messages.py
  4. +4
    -4
      alarmdecoder/zonetracking.py
  5. +3
    -3
      examples/alarm_email.py
  6. +3
    -3
      examples/lrr_example.py
  7. +3
    -3
      examples/rf_device.py
  8. +3
    -3
      examples/serialport.py
  9. +3
    -3
      examples/socket_example.py
  10. +3
    -3
      examples/ssl_socket.py
  11. +7
    -7
      examples/usb_detection.py
  12. +3
    -3
      examples/usb_device.py
  13. +4
    -4
      examples/virtual_zone_expander.py
  14. +35
    -35
      test/test_ad2.py
  15. +19
    -19
      test/test_devices.py
  16. +4
    -4
      test/test_messages.py
  17. +6
    -6
      test/test_zonetracking.py

+ 6
- 6
alarmdecoder/decoder.py View File

@@ -51,15 +51,15 @@ class AlarmDecoder(object):
on_write = event.Event("This event is called when data has been written to the device.\n\n**Callback definition:** *def callback(device, data)*") on_write = event.Event("This event is called when data has been written to the device.\n\n**Callback definition:** *def callback(device, data)*")


# Constants # Constants
KEY_F1 = unichr(1) + unichr(1) + unichr(1)
KEY_F1 = chr(1) + chr(1) + chr(1)
"""Represents panel function key #1""" """Represents panel function key #1"""
KEY_F2 = unichr(2) + unichr(2) + unichr(2)
KEY_F2 = chr(2) + chr(2) + chr(2)
"""Represents panel function key #2""" """Represents panel function key #2"""
KEY_F3 = unichr(3) + unichr(3) + unichr(3)
KEY_F3 = chr(3) + chr(3) + chr(3)
"""Represents panel function key #3""" """Represents panel function key #3"""
KEY_F4 = unichr(4) + unichr(4) + unichr(4)
KEY_F4 = chr(4) + chr(4) + chr(4)
"""Represents panel function key #4""" """Represents panel function key #4"""
KEY_PANIC = unichr(5) + unichr(5) + unichr(5)
KEY_PANIC = chr(5) + chr(5) + chr(5)
"""Represents a panic keypress""" """Represents a panic keypress"""


BATTERY_TIMEOUT = 30 BATTERY_TIMEOUT = 30
@@ -258,7 +258,7 @@ class AlarmDecoder(object):
''.join(['Y' if r else 'N' for r in self.emulate_relay]))) ''.join(['Y' if r else 'N' for r in self.emulate_relay])))
config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N')) config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N'))
config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N')) config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N'))
config_entries.append(('MODE', PANEL_TYPES.keys()[PANEL_TYPES.values().index(self.mode)]))
config_entries.append(('MODE', list(PANEL_TYPES.keys())[list(PANEL_TYPES.values()).index(self.mode)]))


return '&'.join(['='.join(t) for t in config_entries]) return '&'.join(['='.join(t) for t in config_entries])




+ 20
- 20
alarmdecoder/devices.py View File

@@ -172,10 +172,10 @@ class Device(object):
except SSL.WantReadError: except SSL.WantReadError:
pass pass


except CommError, err:
except CommError as err:
self._device.close() self._device.close()


except Exception, err:
except Exception as err:
self._device.close() self._device.close()
self._running = False self._running = False
raise raise
@@ -227,7 +227,7 @@ class USBDevice(Device):
try: try:
cls.__devices = Ftdi.find_all(query, nocache=True) cls.__devices = Ftdi.find_all(query, nocache=True)


except (usb.core.USBError, FtdiError), err:
except (usb.core.USBError, FtdiError) as err:
raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err) raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err)


return cls.__devices return cls.__devices
@@ -434,10 +434,10 @@ class USBDevice(Device):


self._id = self._serial_number self._id = self._serial_number


except (usb.core.USBError, FtdiError), err:
except (usb.core.USBError, FtdiError) as err:
raise NoDeviceError('Error opening device: {0}'.format(str(err)), err) raise NoDeviceError('Error opening device: {0}'.format(str(err)), err)


except KeyError, err:
except KeyError as err:
raise NoDeviceError('Unsupported device. ({0:04x}:{1:04x}) You probably need a newer version of pyftdi.'.format(err[0][0], err[0][1])) raise NoDeviceError('Unsupported device. ({0:04x}:{1:04x}) You probably need a newer version of pyftdi.'.format(err[0][0], err[0][1]))


else: else:
@@ -479,7 +479,7 @@ class USBDevice(Device):


self.on_write(data=data) self.on_write(data=data)


except FtdiError, err:
except FtdiError as err:
raise CommError('Error writing to device: {0}'.format(str(err)), err) raise CommError('Error writing to device: {0}'.format(str(err)), err)


def read(self): def read(self):
@@ -494,7 +494,7 @@ class USBDevice(Device):
try: try:
ret = self._device.read_data(1) ret = self._device.read_data(1)


except (usb.core.USBError, FtdiError), err:
except (usb.core.USBError, FtdiError) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


return ret return ret
@@ -543,7 +543,7 @@ class USBDevice(Device):
else: else:
time.sleep(0.01) time.sleep(0.01)


except (usb.core.USBError, FtdiError), err:
except (usb.core.USBError, FtdiError) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


else: else:
@@ -660,7 +660,7 @@ class SerialDevice(Device):
else: else:
devices = serial.tools.list_ports.comports() devices = serial.tools.list_ports.comports()


except serial.SerialException, err:
except serial.SerialException as err:
raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err) raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err)


return devices return devices
@@ -731,7 +731,7 @@ class SerialDevice(Device):
# all issues with it. # all issues with it.
self._device.baudrate = baudrate self._device.baudrate = baudrate


except (serial.SerialException, ValueError, OSError), err:
except (serial.SerialException, ValueError, OSError) as err:
raise NoDeviceError('Error opening device on {0}.'.format(self._port), err) raise NoDeviceError('Error opening device on {0}.'.format(self._port), err)


else: else:
@@ -771,7 +771,7 @@ class SerialDevice(Device):
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
pass pass


except serial.SerialException, err:
except serial.SerialException as err:
raise CommError('Error writing to device.', err) raise CommError('Error writing to device.', err)


else: else:
@@ -789,7 +789,7 @@ class SerialDevice(Device):
try: try:
ret = self._device.read(1) ret = self._device.read(1)


except serial.SerialException, err:
except serial.SerialException as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


return ret return ret
@@ -839,7 +839,7 @@ class SerialDevice(Device):
else: else:
time.sleep(0.01) time.sleep(0.01)


except (OSError, serial.SerialException), err:
except (OSError, serial.SerialException) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


else: else:
@@ -1014,7 +1014,7 @@ class SocketDevice(Device):


self._id = '{0}:{1}'.format(self._host, self._port) self._id = '{0}:{1}'.format(self._host, self._port)


except socket.error, err:
except socket.error as err:
raise NoDeviceError('Error opening device at {0}:{1}'.format(self._host, self._port), err) raise NoDeviceError('Error opening device at {0}:{1}'.format(self._host, self._port), err)


else: else:
@@ -1067,7 +1067,7 @@ class SocketDevice(Device):


self.on_write(data=data) self.on_write(data=data)


except (SSL.Error, socket.error), err:
except (SSL.Error, socket.error) as err:
raise CommError('Error writing to device.', err) raise CommError('Error writing to device.', err)


return data_sent return data_sent
@@ -1087,7 +1087,7 @@ class SocketDevice(Device):
if (len(read_ready) != 0): if (len(read_ready) != 0):
data = self._device.recv(1) data = self._device.recv(1)


except socket.error, err:
except socket.error as err:
raise CommError('Error while reading from device: {0}'.format(str(err)), err) raise CommError('Error while reading from device: {0}'.format(str(err)), err)


return data return data
@@ -1143,10 +1143,10 @@ class SocketDevice(Device):
else: else:
time.sleep(0.01) time.sleep(0.01)


except socket.error, err:
except socket.error as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


except SSL.SysCallError, err:
except SSL.SysCallError as err:
errno, msg = err errno, msg = err
raise CommError('SSL error while reading from device: {0} ({1})'.format(msg, errno)) raise CommError('SSL error while reading from device: {0} ({1})'.format(msg, errno))


@@ -1175,7 +1175,7 @@ class SocketDevice(Device):
self._device.setblocking(0) self._device.setblocking(0)
while(self._device.recv(1)): while(self._device.recv(1)):
pass pass
except socket.error, err:
except socket.error as err:
pass pass
finally: finally:
self._device.setblocking(1) self._device.setblocking(1)
@@ -1213,7 +1213,7 @@ class SocketDevice(Device):


self._device = SSL.Connection(ctx, self._device) self._device = SSL.Connection(ctx, self._device)


except SSL.Error, err:
except SSL.Error as err:
raise CommError('Error setting up SSL connection.', err) raise CommError('Error setting up SSL connection.', err)


def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok): def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok):


+ 1
- 1
alarmdecoder/messages.py View File

@@ -163,7 +163,7 @@ class Message(BaseMessage):
self.check_zone = is_bit_set(15) self.check_zone = is_bit_set(15)
self.perimeter_only = is_bit_set(16) self.perimeter_only = is_bit_set(16)
self.system_fault = is_bit_set(17) self.system_fault = is_bit_set(17)
if self.bitfield[18] in PANEL_TYPES.keys():
if self.bitfield[18] in list(PANEL_TYPES.keys()):
self.panel_type = PANEL_TYPES[self.bitfield[18]] self.panel_type = PANEL_TYPES[self.bitfield[18]]
# pos 20-21 - Unused. # pos 20-21 - Unused.
self.text = alpha.strip('"') self.text = alpha.strip('"')


+ 4
- 4
alarmdecoder/zonetracking.py View File

@@ -258,7 +258,7 @@ class Zonetracker(object):
it = iter(self._zones_faulted) it = iter(self._zones_faulted)
try: try:
while not found_last_faulted: while not found_last_faulted:
z = it.next()
z = next(it)


if z == self._last_zone_fault: if z == self._last_zone_fault:
found_last_faulted = True found_last_faulted = True
@@ -271,7 +271,7 @@ class Zonetracker(object):
# between to our clear list. # between to our clear list.
try: try:
while not at_end and not found_current: while not at_end and not found_current:
z = it.next()
z = next(it)


if z == zone: if z == zone:
found_current = True found_current = True
@@ -289,7 +289,7 @@ class Zonetracker(object):


try: try:
while not found_current: while not found_current:
z = it.next()
z = next(it)


if z == zone: if z == zone:
found_current = True found_current = True
@@ -310,7 +310,7 @@ class Zonetracker(object):
""" """
zones = [] zones = []


for z in self._zones.keys():
for z in list(self._zones.keys()):
zones += [z] zones += [z]


for z in zones: for z in zones:


+ 3
- 3
examples/alarm_email.py View File

@@ -31,8 +31,8 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_alarm(sender, **kwargs): def handle_alarm(sender, **kwargs):
""" """
@@ -58,7 +58,7 @@ def handle_alarm(sender, **kwargs):
s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string()) s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string())
s.quit() s.quit()


print 'sent alarm email:', text
print('sent alarm email:', text)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/lrr_example.py View File

@@ -19,14 +19,14 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_lrr_message(sender, message): def handle_lrr_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.partition, message.event_type, message.event_data
print(sender, message.partition, message.event_type, message.event_data)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/rf_device.py View File

@@ -29,8 +29,8 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_rfx(sender, message): def handle_rfx(sender, message):
""" """
@@ -38,7 +38,7 @@ def handle_rfx(sender, message):
""" """
# Check for our target serial number and loop # Check for our target serial number and loop
if message.serial_number == RF_DEVICE_SERIAL_NUMBER and message.loop[0] == True: if message.serial_number == RF_DEVICE_SERIAL_NUMBER and message.loop[0] == True:
print message.serial_number, 'triggered loop #1'
print(message.serial_number, 'triggered loop #1')


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/serialport.py View File

@@ -23,14 +23,14 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_message(sender, message): def handle_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.raw
print(sender, message.raw)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/socket_example.py View File

@@ -21,14 +21,14 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_message(sender, message): def handle_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.raw
print(sender, message.raw)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/ssl_socket.py View File

@@ -35,14 +35,14 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_message(sender, message): def handle_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.raw
print(sender, message.raw)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 7
- 7
examples/usb_detection.py View File

@@ -21,12 +21,12 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


finally: finally:
# Close all devices and stop detection. # Close all devices and stop detection.
for sn, device in __devices.iteritems():
for sn, device in __devices.items():
device.close() device.close()


USBDevice.stop_detection() USBDevice.stop_detection()
@@ -48,7 +48,7 @@ def handle_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.raw
print(sender, message.raw)


def handle_attached(sender, device): def handle_attached(sender, device):
""" """
@@ -58,7 +58,7 @@ def handle_attached(sender, device):
dev = create_device(device) dev = create_device(device)
__devices[dev.id] = dev __devices[dev.id] = dev


print 'attached', dev.id
print('attached', dev.id)


def handle_detached(sender, device): def handle_detached(sender, device):
""" """
@@ -67,12 +67,12 @@ def handle_detached(sender, device):
vendor, product, sernum, ifcount, description = device vendor, product, sernum, ifcount, description = device


# Close and remove the device from our list. # Close and remove the device from our list.
if sernum in __devices.keys():
if sernum in list(__devices.keys()):
__devices[sernum].close() __devices[sernum].close()


del __devices[sernum] del __devices[sernum]


print 'detached', sernum
print('detached', sernum)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 3
- 3
examples/usb_device.py View File

@@ -16,14 +16,14 @@ def main():
while True: while True:
time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_message(sender, message): def handle_message(sender, message):
""" """
Handles message events from the AlarmDecoder. Handles message events from the AlarmDecoder.
""" """
print sender, message.raw
print(sender, message.raw)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 4
- 4
examples/virtual_zone_expander.py View File

@@ -47,14 +47,14 @@ def main():


time.sleep(1) time.sleep(1)


except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)


def handle_zone_fault(sender, zone): def handle_zone_fault(sender, zone):
""" """
Handles zone fault messages. Handles zone fault messages.
""" """
print 'zone faulted', zone
print('zone faulted', zone)


# Restore the zone # Restore the zone
sender.clear_zone(zone) sender.clear_zone(zone)
@@ -63,7 +63,7 @@ def handle_zone_restore(sender, zone):
""" """
Handles zone restore messages. Handles zone restore messages.
""" """
print 'zone cleared', zone
print('zone cleared', zone)


if __name__ == '__main__': if __name__ == '__main__':
main() main()

+ 35
- 35
test/test_ad2.py View File

@@ -184,7 +184,7 @@ class TestAlarmDecoder(TestCase):
self._decoder.open() self._decoder.open()
msg = self._decoder._handle_message('!REL:12,01,01') msg = self._decoder._handle_message('!REL:12,01,01')
self.assertIsInstance(msg, ExpanderMessage) self.assertIsInstance(msg, ExpanderMessage)
self.assertEquals(self._relay_changed, True)
self.assertEqual(self._relay_changed, True)


def test_rfx_message(self): def test_rfx_message(self):
msg = self._decoder._handle_message('!RFX:0180036,80') msg = self._decoder._handle_message('!RFX:0180036,80')
@@ -195,95 +195,95 @@ class TestAlarmDecoder(TestCase):
self._decoder.open() self._decoder.open()


msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC') msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC')
self.assertEquals(self._panicked, True)
self.assertEqual(self._panicked, True)


msg = self._decoder._handle_message('!LRR:012,1,CANCEL') msg = self._decoder._handle_message('!LRR:012,1,CANCEL')
self.assertEquals(self._panicked, False)
self.assertEqual(self._panicked, False)
self.assertIsInstance(msg, LRRMessage) self.assertIsInstance(msg, LRRMessage)


def test_config_message(self): def test_config_message(self):
self._decoder.open() self._decoder.open()


msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N') msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEquals(self._decoder.address, 18)
self.assertEquals(self._decoder.configbits, int('ff00', 16))
self.assertEquals(self._decoder.address_mask, int('ffffffff', 16))
self.assertEquals(self._decoder.emulate_zone, [False for x in range(5)])
self.assertEquals(self._decoder.emulate_relay, [False for x in range(4)])
self.assertEquals(self._decoder.emulate_lrr, False)
self.assertEquals(self._decoder.deduplicate, False)
self.assertEqual(self._decoder.address, 18)
self.assertEqual(self._decoder.configbits, int('ff00', 16))
self.assertEqual(self._decoder.address_mask, int('ffffffff', 16))
self.assertEqual(self._decoder.emulate_zone, [False for x in range(5)])
self.assertEqual(self._decoder.emulate_relay, [False for x in range(4)])
self.assertEqual(self._decoder.emulate_lrr, False)
self.assertEqual(self._decoder.deduplicate, False)


self.assertEquals(self._got_config, True)
self.assertEqual(self._got_config, True)


def test_power_changed_event(self): def test_power_changed_event(self):
msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False) # Not set first time we hit it.
self.assertEqual(self._power_changed, False) # Not set first time we hit it.


msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False)
self.assertEqual(self._power_changed, False)


msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, True)
self.assertEqual(self._power_changed, True)


def test_alarm_event(self): def test_alarm_event(self):
msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False) # Not set first time we hit it.
self.assertEqual(self._alarmed, False) # Not set first time we hit it.


msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False)
self.assertEquals(self._alarm_restored, True)
self.assertEqual(self._alarmed, False)
self.assertEqual(self._alarm_restored, True)


msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, True)
self.assertEqual(self._alarmed, True)


def test_zone_bypassed_event(self): def test_zone_bypassed_event(self):
msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False) # Not set first time we hit it.
self.assertEqual(self._bypassed, False) # Not set first time we hit it.


msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False)
self.assertEqual(self._bypassed, False)


msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, True)
self.assertEqual(self._bypassed, True)


def test_armed_away_event(self): def test_armed_away_event(self):
msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.
self.assertEqual(self._armed, False) # Not set first time we hit it.


msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)
self.assertEqual(self._armed, False)


msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)
self.assertEqual(self._armed, True)


self._armed = False self._armed = False


msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.
self.assertEqual(self._armed, False) # Not set first time we hit it.


msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)
self.assertEqual(self._armed, False)


msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)
self.assertEqual(self._armed, True)


def test_battery_low_event(self): def test_battery_low_event(self):
msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, True)
self.assertEqual(self._battery, True)


# force the timeout to expire. # force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35): with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, False)
self.assertEqual(self._battery, False)


def test_fire_alarm_event(self): def test_fire_alarm_event(self):
msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, True)
self.assertEqual(self._fire, True)


# force the timeout to expire. # force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35): with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, False)
self.assertEqual(self._fire, False)


def test_hit_for_faults(self): def test_hit_for_faults(self):
self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "') self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
@@ -303,13 +303,13 @@ class TestAlarmDecoder(TestCase):


def test_zone_fault_and_restore(self): def test_zone_fault_and_restore(self):
self._decoder._on_read(self, data='[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "') self._decoder._on_read(self, data='[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "')
self.assertEquals(self._zone_faulted, 3)
self.assertEqual(self._zone_faulted, 3)


self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_faulted, 4)
self.assertEqual(self._zone_faulted, 4)


self._decoder._on_read(self, data='[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "') self._decoder._on_read(self, data='[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "')
self.assertEquals(self._zone_faulted, 5)
self.assertEqual(self._zone_faulted, 5)


self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_restored, 3)
self.assertEqual(self._zone_restored, 3)

+ 19
- 19
test/test_devices.py View File

@@ -37,19 +37,19 @@ class TestUSBDevice(TestCase):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find() device = USBDevice.find()


self.assertEquals(device.interface, 'AD2')
self.assertEqual(device.interface, 'AD2')


def test_find_with_param(self): def test_find_with_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2')) device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-1')
self.assertEqual(device.interface, 'AD2-1')


device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2')) device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-2')
self.assertEqual(device.interface, 'AD2-2')


def test_events(self): def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)
self.assertEqual(self._attached, False)
self.assertEqual(self._detached, False)


# this is ugly, but it works. # this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
@@ -60,14 +60,14 @@ class TestUSBDevice(TestCase):
time.sleep(1) time.sleep(1)
USBDevice.stop_detection() USBDevice.stop_detection()


self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)
self.assertEqual(self._attached, True)
self.assertEqual(self._detached, True)


def test_find_all(self): def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock: with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all() devices = USBDevice.find_all()


self.assertEquals(devices, [])
self.assertEqual(devices, [])


def test_find_all_exception(self): def test_find_all_exception(self):
with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock: with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
@@ -80,16 +80,16 @@ class TestUSBDevice(TestCase):
def test_interface_serial_number(self): def test_interface_serial_number(self):
self._device.interface = 'AD2USB' self._device.interface = 'AD2USB'


self.assertEquals(self._device.interface, 'AD2USB')
self.assertEquals(self._device.serial_number, 'AD2USB')
self.assertEquals(self._device._device_number, 0)
self.assertEqual(self._device.interface, 'AD2USB')
self.assertEqual(self._device.serial_number, 'AD2USB')
self.assertEqual(self._device._device_number, 0)


def test_interface_index(self): def test_interface_index(self):
self._device.interface = 1 self._device.interface = 1


self.assertEquals(self._device.interface, 1)
self.assertEquals(self._device.serial_number, None)
self.assertEquals(self._device._device_number, 1)
self.assertEqual(self._device.interface, 1)
self.assertEqual(self._device.serial_number, None)
self.assertEqual(self._device._device_number, 1)


def test_open(self): def test_open(self):
self._device.interface = 'AD2USB' self._device.interface = 'AD2USB'
@@ -148,7 +148,7 @@ class TestUSBDevice(TestCase):
except StopIteration: except StopIteration:
pass pass


self.assertEquals(ret, "testing")
self.assertEqual(ret, "testing")


def test_read_line_timeout(self): def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock: with patch.object(self._device._device, 'read_data', return_value='a') as mock:
@@ -235,7 +235,7 @@ class TestSerialDevice(TestCase):
except StopIteration: except StopIteration:
pass pass


self.assertEquals(ret, "testing")
self.assertEqual(ret, "testing")


def test_read_line_timeout(self): def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value='a') as mock: with patch.object(self._device._device, 'read', return_value='a') as mock:
@@ -317,7 +317,7 @@ class TestSocketDevice(TestCase):
except StopIteration: except StopIteration:
pass pass


self.assertEquals(ret, "testing")
self.assertEqual(ret, "testing")


def test_read_line_timeout(self): def test_read_line_timeout(self):
with patch('socket.socket.fileno', return_value=1): with patch('socket.socket.fileno', return_value=1):
@@ -362,7 +362,7 @@ class TestSocketDevice(TestCase):
with patch.object(socket.socket, 'fileno', return_value=fileno): with patch.object(socket.socket, 'fileno', return_value=fileno):
try: try:
self._device.open(no_reader_thread=True) self._device.open(no_reader_thread=True)
except SSL.SysCallError, ex:
except SSL.SysCallError as ex:
pass pass


os.close(fileno) os.close(fileno)
@@ -387,7 +387,7 @@ class TestSocketDevice(TestCase):
with self.assertRaises(CommError): with self.assertRaises(CommError):
try: try:
self._device.open(no_reader_thread=True) self._device.open(no_reader_thread=True)
except SSL.SysCallError, ex:
except SSL.SysCallError as ex:
pass pass


os.close(fileno) os.close(fileno)


+ 4
- 4
test/test_messages.py View File

@@ -14,7 +14,7 @@ class TestMessages(TestCase):
def test_message_parse(self): def test_message_parse(self):
msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "') msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "')


self.assertEquals(msg.numeric_code, '001')
self.assertEqual(msg.numeric_code, '001')


def test_message_parse_fail(self): def test_message_parse_fail(self):
with self.assertRaises(InvalidMessageError): with self.assertRaises(InvalidMessageError):
@@ -23,7 +23,7 @@ class TestMessages(TestCase):
def test_expander_message_parse(self): def test_expander_message_parse(self):
msg = ExpanderMessage('!EXP:07,01,01') msg = ExpanderMessage('!EXP:07,01,01')


self.assertEquals(msg.address, 7)
self.assertEqual(msg.address, 7)


def test_expander_message_parse_fail(self): def test_expander_message_parse_fail(self):
with self.assertRaises(InvalidMessageError): with self.assertRaises(InvalidMessageError):
@@ -32,7 +32,7 @@ class TestMessages(TestCase):
def test_rf_message_parse(self): def test_rf_message_parse(self):
msg = RFMessage('!RFX:0180036,80') msg = RFMessage('!RFX:0180036,80')


self.assertEquals(msg.serial_number, '0180036')
self.assertEqual(msg.serial_number, '0180036')


def test_rf_message_parse_fail(self): def test_rf_message_parse_fail(self):
with self.assertRaises(InvalidMessageError): with self.assertRaises(InvalidMessageError):
@@ -41,7 +41,7 @@ class TestMessages(TestCase):
def test_lrr_message_parse(self): def test_lrr_message_parse(self):
msg = LRRMessage('!LRR:012,1,ARM_STAY') msg = LRRMessage('!LRR:012,1,ARM_STAY')


self.assertEquals(msg.event_type, 'ARM_STAY')
self.assertEqual(msg.event_type, 'ARM_STAY')


def test_lrr_message_parse_fail(self): def test_lrr_message_parse_fail(self):
with self.assertRaises(InvalidMessageError): with self.assertRaises(InvalidMessageError):


+ 6
- 6
test/test_zonetracking.py View File

@@ -39,7 +39,7 @@ class TestZonetracking(TestCase):
zone, msg = self._build_expander_message('!EXP:07,01,01') zone, msg = self._build_expander_message('!EXP:07,01,01')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(self._zonetracker._zones[zone].status, Zone.FAULT)
self.assertEqual(self._zonetracker._zones[zone].status, Zone.FAULT)
self.assertTrue(self._faulted) self.assertTrue(self._faulted)


def test_zone_restore(self): def test_zone_restore(self):
@@ -49,31 +49,31 @@ class TestZonetracking(TestCase):
zone, msg = self._build_expander_message('!EXP:07,01,00') zone, msg = self._build_expander_message('!EXP:07,01,00')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(self._zonetracker._zones[zone].status, Zone.CLEAR)
self.assertEqual(self._zonetracker._zones[zone].status, Zone.CLEAR)
self.assertTrue(self._restored) self.assertTrue(self._restored)


def test_message_ready(self): def test_message_ready(self):
msg = Message('[0000000000000010----],001,[f707000600e5800c0c020000]," "') msg = Message('[0000000000000010----],001,[f707000600e5800c0c020000]," "')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(len(self._zonetracker._zones_faulted), 1)
self.assertEqual(len(self._zonetracker._zones_faulted), 1)


msg = Message('[1000000000000000----],000,[f707000600e5800c0c020000]," "') msg = Message('[1000000000000000----],000,[f707000600e5800c0c020000]," "')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(len(self._zonetracker._zones_faulted), 0)
self.assertEqual(len(self._zonetracker._zones_faulted), 0)


def test_message_fault_text(self): def test_message_fault_text(self):
msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "') msg = Message('[0000000000000000----],001,[f707000600e5800c0c020000],"FAULT 1 "')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(len(self._zonetracker._zones_faulted), 1)
self.assertEqual(len(self._zonetracker._zones_faulted), 1)


def test_ECP_failure(self): def test_ECP_failure(self):
msg = Message('[0000000000000010----],0bf,[f707000600e5800c0c020000],"CHECK 1 "') msg = Message('[0000000000000010----],0bf,[f707000600e5800c0c020000],"CHECK 1 "')
self._zonetracker.update(msg) self._zonetracker.update(msg)


self.assertEquals(self._zonetracker._zones['1'].status, Zone.CHECK)
self.assertEqual(self._zonetracker._zones['1'].status, Zone.CHECK)


def test_zone_restore_skip(self): def test_zone_restore_skip(self):
panel_messages = [ panel_messages = [


Loading…
Cancel
Save