Browse Source

More conversion.

pyserial_fix
Scott Petersen 9 years ago
parent
commit
b992ae1500
15 changed files with 293 additions and 256 deletions
  1. +3
    -2
      alarmdecoder/devices.py
  2. +3
    -3
      alarmdecoder/zonetracking.py
  3. +6
    -6
      bin/ad2-firmwareupload
  4. +4
    -5
      examples/alarm_email.py
  5. +3
    -3
      examples/lrr_example.py
  6. +3
    -3
      examples/rf_device.py
  7. +3
    -3
      examples/serialport.py
  8. +3
    -3
      examples/socket_example.py
  9. +3
    -3
      examples/ssl_socket.py
  10. +4
    -4
      examples/usb_detection.py
  11. +3
    -3
      examples/usb_device.py
  12. +7
    -4
      examples/virtual_zone_expander.py
  13. +6
    -2
      setup.py
  14. +47
    -45
      test/test_ad2.py
  15. +195
    -167
      test/test_devices.py

+ 3
- 2
alarmdecoder/devices.py View File

@@ -20,6 +20,7 @@ import threading
import serial
import serial.tools.list_ports
import socket
from builtins import bytes

from .util import CommError, TimeoutError, NoDeviceError, InvalidMessageError
from .event import event
@@ -825,7 +826,7 @@ class SerialDevice(Device):

try:
while timeout_event.reading:
buf = self._device.read(1)
buf = bytes(self._device.read(1), 'utf-8')

# NOTE: AD2SERIAL apparently sends down \xFF on boot.
if buf != b'' and buf != b"\xff":
@@ -1114,7 +1115,7 @@ class SocketDevice(Device):

try:
while timeout_event.reading:
buf = self._device.recv(1)
buf = bytes(self._device.recv(1), 'utf-8')

if buf != b'':
self._buffer += buf


+ 3
- 3
alarmdecoder/zonetracking.py View File

@@ -242,7 +242,7 @@ class Zonetracker(object):
it = iter(self._zones_faulted)
try:
while not found_last_faulted:
z = it.next()
z = next(it)

if z == self._last_zone_fault:
found_last_faulted = True
@@ -255,7 +255,7 @@ class Zonetracker(object):
# between to our clear list.
try:
while not at_end and not found_current:
z = it.next()
z = next(it)

if z == zone:
found_current = True
@@ -273,7 +273,7 @@ class Zonetracker(object):

try:
while not found_current:
z = it.next()
z = next(it)

if z == zone:
found_current = True


+ 6
- 6
bin/ad2-firmwareupload View File

@@ -15,10 +15,10 @@ def handle_firmware(stage):
sys.stdout.write('.')
sys.stdout.flush()
elif stage == alarmdecoder.util.Firmware.STAGE_BOOT:
if handle_firmware.wait_tick > 0: print ""
print "Rebooting device.."
if handle_firmware.wait_tick > 0: print("")
print("Rebooting device..")
elif stage == alarmdecoder.util.Firmware.STAGE_LOAD:
print 'Waiting for boot loader..'
print('Waiting for boot loader..')
elif stage == alarmdecoder.util.Firmware.STAGE_UPLOADING:
if handle_firmware.upload_tick == 0:
sys.stdout.write('Uploading firmware.')
@@ -29,7 +29,7 @@ def handle_firmware(stage):
sys.stdout.write('.')
sys.stdout.flush()
elif stage == alarmdecoder.util.Firmware.STAGE_DONE:
print "\r\nDone!"
print("\r\nDone!")

def main():
device = '/dev/ttyUSB0'
@@ -37,7 +37,7 @@ def main():
baudrate = 115200

if len(sys.argv) < 2:
print "Syntax: {0} <firmware> [device path or hostname:port] [baudrate]".format(sys.argv[0])
print("Syntax: {0} <firmware> [device path or hostname:port] [baudrate]".format(sys.argv[0]))
sys.exit(1)

firmware = sys.argv[1]
@@ -47,7 +47,7 @@ def main():
if len(sys.argv) > 3:
baudrate = sys.argv[3]

print "Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate)
print("Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate))

if ':' in device:
hostname, port = device.split(':')


+ 4
- 5
examples/alarm_email.py View File

@@ -31,16 +31,15 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_alarm(sender, **kwargs):
"""
Handles alarm events from the AlarmDecoder.
"""
status = kwargs.pop('status', None)
zone = kwargs.pop('zone', None)
text = "Alarm status: {0} - Zone {1}".format(status, zone)
text = "Alarm: Zone {0}".format(zone)

# Build the email message
msg = MIMEText(text)
@@ -58,7 +57,7 @@ def handle_alarm(sender, **kwargs):
s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string())
s.quit()

print 'sent alarm email:', text
print('sent alarm email:', text)

if __name__ == '__main__':
main()

+ 3
- 3
examples/lrr_example.py View File

@@ -19,14 +19,14 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_lrr_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.partition, message.event_type, message.event_data
print(sender, message.partition, message.event_type, message.event_data)

if __name__ == '__main__':
main()

+ 3
- 3
examples/rf_device.py View File

@@ -29,8 +29,8 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_rfx(sender, message):
"""
@@ -38,7 +38,7 @@ def handle_rfx(sender, message):
"""
# Check for our target serial number and loop
if message.serial_number == RF_DEVICE_SERIAL_NUMBER and message.loop[0] == True:
print message.serial_number, 'triggered loop #1'
print(message.serial_number, 'triggered loop #1')

if __name__ == '__main__':
main()

+ 3
- 3
examples/serialport.py View File

@@ -23,14 +23,14 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.raw
print(sender, message.raw)

if __name__ == '__main__':
main()

+ 3
- 3
examples/socket_example.py View File

@@ -21,14 +21,14 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.raw
print(sender, message.raw)

if __name__ == '__main__':
main()

+ 3
- 3
examples/ssl_socket.py View File

@@ -35,14 +35,14 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.raw
print(sender, message.raw)

if __name__ == '__main__':
main()

+ 4
- 4
examples/usb_detection.py View File

@@ -21,7 +21,7 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
except Exception as ex:
print 'Exception:', ex

finally:
@@ -48,7 +48,7 @@ def handle_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.raw
print(sender, message.raw)

def handle_attached(sender, device):
"""
@@ -58,7 +58,7 @@ def handle_attached(sender, device):
dev = create_device(device)
__devices[dev.id] = dev

print 'attached', dev.id
print('attached', dev.id)

def handle_detached(sender, device):
"""
@@ -72,7 +72,7 @@ def handle_detached(sender, device):

del __devices[sernum]

print 'detached', sernum
print('detached', sernum)

if __name__ == '__main__':
main()

+ 3
- 3
examples/usb_device.py View File

@@ -16,14 +16,14 @@ def main():
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
print('Exception:', ex)

def handle_message(sender, message):
"""
Handles message events from the AlarmDecoder.
"""
print sender, message.raw
print(sender, message.raw)

if __name__ == '__main__':
main()

+ 7
- 4
examples/virtual_zone_expander.py View File

@@ -47,14 +47,17 @@ def main():

time.sleep(1)

except Exception, ex:
print 'Exception:', ex
except Exception as ex:
import sys
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exc(exc_traceback)
print('Exception:', ex)

def handle_zone_fault(sender, zone):
"""
Handles zone fault messages.
"""
print 'zone faulted', zone
print('zone faulted', zone)

# Restore the zone
sender.clear_zone(zone)
@@ -63,7 +66,7 @@ def handle_zone_restore(sender, zone):
"""
Handles zone restore messages.
"""
print 'zone cleared', zone
print('zone cleared', zone)

if __name__ == '__main__':
main()

+ 6
- 2
setup.py View File

@@ -1,5 +1,6 @@
"""Setup script"""

import sys
from setuptools import setup

def readme():
@@ -8,6 +9,10 @@ def readme():
with open('README.rst') as readme_file:
return readme_file.read()

extra_requirements = []
if sys.version_info < (3,):
extra_requirements.append('future==0.14.3')

setup(name='alarmdecoder',
version='0.9.1',
description='Python interface for the AlarmDecoder (AD2) family '
@@ -30,9 +35,8 @@ setup(name='alarmdecoder',
license='MIT',
packages=['alarmdecoder', 'alarmdecoder.event'],
install_requires=[
'future==0.14.3'
'pyserial==2.7',
],
] + extra_requirements,
test_suite='nose.collector',
tests_require=['nose', 'mock'],
scripts=['bin/ad2-sslterm', 'bin/ad2-firmwareupload'],


+ 47
- 45
test/test_ad2.py View File

@@ -1,5 +1,7 @@
import time

from builtins import bytes

from unittest import TestCase
from mock import Mock, MagicMock, patch

@@ -133,11 +135,11 @@ class TestAlarmDecoder(TestCase):

def test_send(self):
self._decoder.send('test')
self._device.write.assert_called_with('test')
self._device.write.assert_called_with(b'test')

def test_get_config(self):
self._decoder.get_config()
self._device.write.assert_called_with("C\r")
self._device.write.assert_called_with(b"C\r")

def test_save_config(self):
self._decoder.save_config()
@@ -145,66 +147,66 @@ class TestAlarmDecoder(TestCase):

def test_reboot(self):
self._decoder.reboot()
self._device.write.assert_called_with('=')
self._device.write.assert_called_with(b'=')

def test_fault(self):
self._decoder.fault_zone(1)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1))
self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 1), 'utf-8'))

def test_fault_wireproblem(self):
self._decoder.fault_zone(1, simulate_wire_problem=True)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2))
self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 2), 'utf-8'))

def test_clear_zone(self):
self._decoder.clear_zone(1)
self._device.write.assert_called_with("L{0:02}0\r".format(1))
self._device.write.assert_called_with(bytes("L{0:02}0\r".format(1), 'utf-8'))

def test_message(self):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_message_kpm(self):
msg = self._decoder._handle_message('!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_expander_message(self):
msg = self._decoder._handle_message('!EXP:07,01,01')
msg = self._decoder._handle_message(b'!EXP:07,01,01')
self.assertIsInstance(msg, ExpanderMessage)

self._decoder._on_read(self, data='!EXP:07,01,01')
self._decoder._on_read(self, data=b'!EXP:07,01,01')
self.assertTrue(self._expander_message_received)

def test_relay_message(self):
self._decoder.open()
msg = self._decoder._handle_message('!REL:12,01,01')
msg = self._decoder._handle_message(b'!REL:12,01,01')
self.assertIsInstance(msg, ExpanderMessage)
self.assertEquals(self._relay_changed, True)

def test_rfx_message(self):
msg = self._decoder._handle_message('!RFX:0180036,80')
msg = self._decoder._handle_message(b'!RFX:0180036,80')
self.assertIsInstance(msg, RFMessage)
self.assertTrue(self._rfx_message_received)

def test_panic(self):
self._decoder.open()

msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC')
msg = self._decoder._handle_message(b'!LRR:012,1,ALARM_PANIC')
self.assertEquals(self._panicked, True)

msg = self._decoder._handle_message('!LRR:012,1,CANCEL')
msg = self._decoder._handle_message(b'!LRR:012,1,CANCEL')
self.assertEquals(self._panicked, False)
self.assertIsInstance(msg, LRRMessage)

def test_config_message(self):
self._decoder.open()

msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
msg = self._decoder._handle_message(b'!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEquals(self._decoder.address, 18)
self.assertEquals(self._decoder.configbits, int('ff00', 16))
self.assertEquals(self._decoder.address_mask, int('ffffffff', 16))
@@ -216,100 +218,100 @@ class TestAlarmDecoder(TestCase):
self.assertEquals(self._got_config, True)

def test_power_changed_event(self):
msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False)

msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, True)

def test_alarm_event(self):
msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False)
self.assertEquals(self._alarm_restored, True)

msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, True)

def test_zone_bypassed_event(self):
msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False)

msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, True)

def test_armed_away_event(self):
msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

self._armed = False

msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

def test_battery_low_event(self):
msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, False)

def test_fire_alarm_event(self):
msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, False)

def test_hit_for_faults(self):
self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')

self._decoder._device.write.assert_called_with('*')
self._decoder._device.write.assert_called_with(b'*')

def test_sending_received(self):
self._decoder._on_read(self, data='!Sending.done')
self._decoder._on_read(self, data=b'!Sending.done')
self.assertTrue(self._sending_received_status)

self._decoder._on_read(self, data='!Sending.....done')
self._decoder._on_read(self, data=b'!Sending.....done')
self.assertFalse(self._sending_received_status)

def test_boot(self):
self._decoder._on_read(self, data='!Ready')
self._decoder._on_read(self, data=b'!Ready')
self.assertTrue(self._on_boot_received)

def test_zone_fault_and_restore(self):
self._decoder._on_read(self, data='[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "')
self._decoder._on_read(self, data=b'[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "')
self.assertEquals(self._zone_faulted, 3)

self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_faulted, 4)

self._decoder._on_read(self, data='[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "')
self._decoder._on_read(self, data=b'[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "')
self.assertEquals(self._zone_faulted, 5)

self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_restored, 3)

+ 195
- 167
test/test_devices.py View File

@@ -1,168 +1,32 @@
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice
import socket
import time
import tempfile
import os
from OpenSSL import SSL, crypto
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError


class TestUSBDevice(TestCase):
def setUp(self):
self._device = USBDevice()
self._device._device = Mock(spec=Ftdi)
self._device._device.usb_dev = Mock(spec=USBCoreDevice)
self._device._device.usb_dev.bus = 0
self._device._device.usb_dev.address = 0

self._attached = False
self._detached = False

def tearDown(self):
self._device.close()

def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()

self.assertEquals(device.interface, 'AD2')

def test_find_with_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-1')

device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.find_all()
time.sleep(1)
USBDevice.stop_detection()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all()

self.assertEquals(devices, [])

def test_find_all_exception(self):
with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
with self.assertRaises(CommError):
devices = USBDevice.find_all()

with self.assertRaises(CommError):
devices = USBDevice.find_all()

def test_interface_serial_number(self):
self._device.interface = 'AD2USB'

self.assertEquals(self._device.interface, 'AD2USB')
self.assertEquals(self._device.serial_number, 'AD2USB')
self.assertEquals(self._device._device_number, 0)

def test_interface_index(self):
self._device.interface = 1

self.assertEquals(self._device.interface, 1)
self.assertEquals(self._device.serial_number, None)
self.assertEquals(self._device._device_number, 1)

def test_open(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open') as mock:
self._device.open(no_reader_thread=True)

mock.assert_any_calls()

def test_open_failed(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write_data') as mock:
self._device.write('test')

mock.assert_called_with('test')

def test_write_exception(self):
with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
with self.assertRaises(CommError):
self._device.write('test')

def test_read(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read_data') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read()

with self.assertRaises(CommError):
self._device.read()
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException

def test_read_line(self):
with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError

self.assertEquals(ret, "testing")
# Optional FTDI tests
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)
have_pyftdi = True

self.assertIn('a', self._device._buffer)
except ImportError:
have_pyftdi = False

def test_read_line_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read_line()
# Optional SSL tests
try:
from OpenSSL import SSL, crypto

with self.assertRaises(CommError):
self._device.read_line()
have_openssl = True
except ImportError:
have_openssl = False


class TestSerialDevice(TestCase):
@@ -203,14 +67,14 @@ class TestSerialDevice(TestCase):
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write') as mock:
self._device.write('test')
self._device.write(b'test')

mock.assert_called_with('test')
mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'write', side_effect=SerialException):
with self.assertRaises(CommError):
self._device.write('test')
self._device.write(b'test')

def test_read(self):
self._device.interface = '/dev/ttyS0'
@@ -234,14 +98,14 @@ class TestSerialDevice(TestCase):
except StopIteration:
pass

self.assertEquals(ret, "testing")
self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)
self.assertIn('a', self._device._buffer.decode('utf-8'))

def test_read_line_exception(self):
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
@@ -278,14 +142,18 @@ class TestSocketDevice(TestCase):
self._device.open(no_reader_thread=True)

with patch.object(socket.socket, 'send') as mock:
self._device.write('test')
self._device.write(b'test')

mock.assert_called_with('test')
mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]):
side_effects = [socket.error]
if (have_openssl):
side_effects.append(SSL.Error)

with patch.object(self._device._device, 'send', side_effect=side_effects):
with self.assertRaises(CommError):
self._device.write('test')
self._device.write(b'test')

def test_read(self):
with patch.object(socket.socket, '__init__', return_value=None):
@@ -310,14 +178,14 @@ class TestSocketDevice(TestCase):
except StopIteration:
pass

self.assertEquals(ret, "testing")
self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'recv', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)
self.assertIn('a', self._device._buffer.decode('utf-8'))

def test_read_line_exception(self):
with patch.object(self._device._device, 'recv', side_effect=socket.error):
@@ -328,6 +196,9 @@ class TestSocketDevice(TestCase):
self._device.read_line()

def test_ssl(self):
if not have_openssl:
return

ssl_key = crypto.PKey()
ssl_key.generate_key(crypto.TYPE_RSA, 2048)
ssl_cert = crypto.X509()
@@ -351,7 +222,7 @@ class TestSocketDevice(TestCase):
with patch.object(socket.socket, 'fileno', return_value=fileno):
try:
self._device.open(no_reader_thread=True)
except SSL.SysCallError, ex:
except SSL.SysCallError as ex:
pass

os.close(fileno)
@@ -361,6 +232,9 @@ class TestSocketDevice(TestCase):
self.assertIsInstance(self._device._device, SSL.Connection)

def test_ssl_exception(self):
if not have_openssl:
return

self._device.ssl = True
self._device.ssl_key = 'None'
self._device.ssl_certificate = 'None'
@@ -376,8 +250,162 @@ class TestSocketDevice(TestCase):
with self.assertRaises(CommError):
try:
self._device.open(no_reader_thread=True)
except SSL.SysCallError, ex:
except SSL.SysCallError as ex:
pass

os.close(fileno)
os.unlink(path)


if have_pyftdi:
class TestUSBDevice(TestCase):
def setUp(self):
self._device = USBDevice()
self._device._device = Mock(spec=Ftdi)
self._device._device.usb_dev = Mock(spec=USBCoreDevice)
self._device._device.usb_dev.bus = 0
self._device._device.usb_dev.address = 0

self._attached = False
self._detached = False

def tearDown(self):
self._device.close()

def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()

self.assertEquals(device.interface, 'AD2')

def test_find_with_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-1')

device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.find_all()
time.sleep(1)
USBDevice.stop_detection()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all()

self.assertEquals(devices, [])

def test_find_all_exception(self):
with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
with self.assertRaises(CommError):
devices = USBDevice.find_all()

with self.assertRaises(CommError):
devices = USBDevice.find_all()

def test_interface_serial_number(self):
self._device.interface = 'AD2USB'

self.assertEquals(self._device.interface, 'AD2USB')
self.assertEquals(self._device.serial_number, 'AD2USB')
self.assertEquals(self._device._device_number, 0)

def test_interface_index(self):
self._device.interface = 1

self.assertEquals(self._device.interface, 1)
self.assertEquals(self._device.serial_number, None)
self.assertEquals(self._device._device_number, 1)

def test_open(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open') as mock:
self._device.open(no_reader_thread=True)

mock.assert_any_calls()

def test_open_failed(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write_data') as mock:
self._device.write(b'test')

mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
with self.assertRaises(CommError):
self._device.write(b'test')

def test_read(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read_data') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read()

with self.assertRaises(CommError):
self._device.read()

def test_read_line(self):
with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)

def test_read_line_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read_line()

with self.assertRaises(CommError):
self._device.read_line()

Loading…
Cancel
Save