diff --git a/pyad2/__init__.py b/pyad2/__init__.py index 707a2f5..3c15c6a 100644 --- a/pyad2/__init__.py +++ b/pyad2/__init__.py @@ -1,4 +1,4 @@ -import ad2 +from ad2 import AD2 import devices import util import messages diff --git a/pyad2/ad2.py b/pyad2/ad2.py index 7ca5532..5ae6fe5 100644 --- a/pyad2/ad2.py +++ b/pyad2/ad2.py @@ -8,167 +8,10 @@ import time import threading from .event import event -from .devices import USBDevice from .util import CommError, NoDeviceError from .messages import Message, ExpanderMessage, RFMessage, LRRMessage from .zonetracking import Zonetracker -class AD2Factory(object): - """ - Factory for creation of AD2USB devices as well as provides attach/detach events." - """ - - # Factory events - on_attached = event.Event('Called when an AD2USB device has been detected.') - on_detached = event.Event('Called when an AD2USB device has been removed.') - - __devices = [] - - @classmethod - def find_all(cls): - """ - Returns all AD2USB devices located on the system. - - :returns: list of devices found - :raises: CommError - """ - cls.__devices = USBDevice.find_all() - - return cls.__devices - - @classmethod - def devices(cls): - """ - Returns a cached list of AD2USB devices located on the system. - - :returns: cached list of devices found. - """ - return cls.__devices - - @classmethod - def create(cls, device=None): - """ - Factory method that returns the requested AD2USB device, or the first device. - - :param device: Tuple describing the USB device to open, as returned by find_all(). - :type device: tuple - - :returns: AD2USB object utilizing the specified device. - :raises: NoDeviceError - """ - cls.find_all() - - if len(cls.__devices) == 0: - raise NoDeviceError('No AD2USB devices present.') - - if device is None: - device = cls.__devices[0] - - vendor, product, sernum, ifcount, description = device - device = USBDevice(interface=sernum) - - return AD2(device) - - def __init__(self, attached_event=None, detached_event=None): - """ - Constructor - - :param attached_event: Event to trigger when a device is attached. - :type attached_event: function - :param detached_event: Event to trigger when a device is detached. - :type detached_event: function - """ - self._detect_thread = AD2Factory.DetectThread(self) - - if attached_event: - self.on_attached += attached_event - - if detached_event: - self.on_detached += detached_event - - AD2Factory.find_all() - - self.start() - - def close(self): - """ - Clean up and shut down. - """ - self.stop() - - def start(self): - """ - Starts the detection thread, if not already running. - """ - if not self._detect_thread.is_alive(): - self._detect_thread.start() - - def stop(self): - """ - Stops the detection thread. - """ - self._detect_thread.stop() - - def get_device(self, device=None): - """ - Factory method that returns the requested AD2USB device, or the first device. - - :param device: Tuple describing the USB device to open, as returned by find_all(). - :type device: tuple - """ - return AD2Factory.create(device) - - class DetectThread(threading.Thread): - """ - Thread that handles detection of added/removed devices. - """ - def __init__(self, factory): - """ - Constructor - - :param factory: AD2Factory object to use with the thread. - :type factory: AD2Factory - """ - threading.Thread.__init__(self) - - self._factory = factory - self._running = False - - def stop(self): - """ - Stops the thread. - """ - self._running = False - - def run(self): - """ - The actual detection process. - """ - self._running = True - - last_devices = set() - - while self._running: - try: - AD2Factory.find_all() - - current_devices = set(AD2Factory.devices()) - new_devices = [d for d in current_devices if d not in last_devices] - removed_devices = [d for d in last_devices if d not in current_devices] - last_devices = current_devices - - for d in new_devices: - self._factory.on_attached(device=d) - - for d in removed_devices: - self._factory.on_detached(device=d) - - except CommError, err: - pass - - time.sleep(0.25) - - class AD2(object): """ High-level wrapper around AD2 devices. diff --git a/pyad2/devices.py b/pyad2/devices.py index 761deb9..927b397 100644 --- a/pyad2/devices.py +++ b/pyad2/devices.py @@ -142,23 +142,84 @@ class USBDevice(Device): BAUDRATE = 115200 """Default baudrate for AD2USB devices.""" - @staticmethod - def find_all(): + __devices = [] + + @classmethod + def find_all(cls, vid=FTDI_VENDOR_ID, pid=FTDI_PRODUCT_ID): """ Returns all FTDI devices matching our vendor and product IDs. :returns: list of devices :raises: CommError """ - devices = [] + cls.__devices = [] try: - devices = Ftdi.find_all([(USBDevice.FTDI_VENDOR_ID, USBDevice.FTDI_PRODUCT_ID)], nocache=True) + cls.__devices = Ftdi.find_all([(vid, pid)], nocache=True) except (usb.core.USBError, FtdiError), err: raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err) - return devices + return cls.__devices + + @classmethod + def devices(cls): + """ + Returns a cached list of AD2USB devices located on the system. + + :returns: cached list of devices found. + """ + return cls.__devices + + @classmethod + def find(cls, device=None): + """ + Factory method that returns the requested USBDevice device, or the first device. + + :param device: Tuple describing the USB device to open, as returned by find_all(). + :type device: tuple + + :returns: USBDevice object utilizing the specified device. + :raises: NoDeviceError + """ + cls.find_all() + + if len(cls.__devices) == 0: + raise NoDeviceError('No AD2USB devices present.') + + if device is None: + device = cls.__devices[0] + + vendor, product, sernum, ifcount, description = device + + return USBDevice(interface=sernum) + + @classmethod + def start_detection(cls, on_attached=None, on_detached=None): + """ + Starts the device detection thread. + + :param on_attached: function to be called when a device is attached. + :type on_attached: function + :param on_detached: function to be called when a device is detached. + :type on_detached: function + """ + cls.__detect_thread = USBDevice.DetectThread(on_attached, on_detached) + + cls.find_all() + + cls.__detect_thread.start() + + @classmethod + def stop_detection(cls): + """ + Stops the device detection thread. + """ + try: + cls.__detect_thread.stop() + + except: + pass @property def interface(self): @@ -267,7 +328,10 @@ class USBDevice(Device): self._device.set_baudrate(baudrate) - self._id = 'USB {0}:{1}'.format(self._device.usb_dev.bus, self._device.usb_dev.address) + if not self._serial_number: + self._serial_number = self._get_serial_number() + + self._id = self._serial_number except (usb.core.USBError, FtdiError), err: raise NoDeviceError('Error opening device: {0}'.format(str(err)), err) @@ -395,6 +459,71 @@ class USBDevice(Device): return ret + def _get_serial_number(self): + """ + Retrieves the FTDI device serial number. + + :returns: string containing the device serial number. + """ + return usb.util.get_string(self._device.usb_dev, 64, self._device.usb_dev.iSerialNumber) + + class DetectThread(threading.Thread): + """ + Thread that handles detection of added/removed devices. + """ + on_attached = event.Event('Called when an AD2USB device has been detected.') + on_detached = event.Event('Called when an AD2USB device has been removed.') + + def __init__(self, on_attached=None, on_detached=None): + """ + Constructor + + :param factory: AD2Factory object to use with the thread. + :type factory: AD2Factory + """ + threading.Thread.__init__(self) + + if on_attached: + self.on_attached += on_attached + + if on_detached: + self.on_detached += on_detached + + self._running = False + + def stop(self): + """ + Stops the thread. + """ + self._running = False + + def run(self): + """ + The actual detection process. + """ + self._running = True + + last_devices = set() + + while self._running: + try: + current_devices = set(USBDevice.find_all()) + + new_devices = [d for d in current_devices if d not in last_devices] + removed_devices = [d for d in last_devices if d not in current_devices] + last_devices = current_devices + + for d in new_devices: + self.on_attached(device=d) + + for d in removed_devices: + self.on_detached(device=d) + + except CommError, err: + pass + + time.sleep(0.25) + class SerialDevice(Device): """ diff --git a/pyad2/tests/test_ad2.py b/pyad2/tests/test_ad2.py index 0826905..a793195 100644 --- a/pyad2/tests/test_ad2.py +++ b/pyad2/tests/test_ad2.py @@ -3,70 +3,12 @@ import time from unittest import TestCase from mock import Mock, MagicMock, patch -from ..ad2 import AD2Factory, AD2 +from ..ad2 import AD2 from ..devices import USBDevice from ..messages import Message, RFMessage, LRRMessage, ExpanderMessage from ..event.event import Event, EventHandler from ..zonetracking import Zonetracker -class TestAD2Factory(TestCase): - def setUp(self): - self._attached = False - self._detached = False - - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): - self._factory = AD2Factory() - - def tearDown(self): - self._factory.stop() - - def attached_event(self, sender, *args, **kwargs): - self._attached = True - - def detached_event(self, sender, *args, **kwargs): - self._detached = True - - def test_find_all(self): - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): - devices = AD2Factory.find_all() - - self.assertEquals(devices[0][2], 'AD2') - - def test_create_default_param(self): - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): - device = AD2Factory.create() - - self.assertEquals(device._device.interface, 'AD2') - - def test_create_with_param(self): - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): - device = AD2Factory.create((0, 0, 'AD2-1', 1, 'AD2')) - self.assertEquals(device._device.interface, 'AD2-1') - - device = AD2Factory.create((0, 0, 'AD2-2', 1, 'AD2')) - self.assertEquals(device._device.interface, 'AD2-2') - - def test_events(self): - self.assertEquals(self._attached, False) - self.assertEquals(self._detached, False) - - # this is ugly, but it works. - self._factory.stop() - self._factory._detect_thread = AD2Factory.DetectThread(self._factory) - self._factory.on_attached += self.attached_event - self._factory.on_detached += self.detached_event - - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): - self._factory.start() - - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]): - AD2Factory.find_all() - time.sleep(1) - self._factory.stop() - - self.assertEquals(self._attached, True) - self.assertEquals(self._detached, True) - class TestAD2(TestCase): def setUp(self): self._panicked = False diff --git a/pyad2/tests/test_devices.py b/pyad2/tests/test_devices.py index 7e7c42f..df41398 100644 --- a/pyad2/tests/test_devices.py +++ b/pyad2/tests/test_devices.py @@ -4,6 +4,7 @@ from serial import Serial, SerialException from pyftdi.pyftdi.ftdi import Ftdi, FtdiError from usb.core import USBError, Device as USBCoreDevice import socket +import time from OpenSSL import SSL, crypto from ..devices import USBDevice, SerialDevice, SocketDevice from ..util import NoDeviceError, CommError, TimeoutError @@ -17,9 +18,48 @@ class TestUSBDevice(TestCase): self._device._device.usb_dev.bus = 0 self._device._device.usb_dev.address = 0 + self._attached = False + self._detached = False + def tearDown(self): self._device.close() + def attached_event(self, sender, *args, **kwargs): + self._attached = True + + def detached_event(self, sender, *args, **kwargs): + self._detached = True + + def test_find_default_param(self): + with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): + device = USBDevice.find() + + self.assertEquals(device.interface, 'AD2') + + def test_find_with_param(self): + with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2')) + self.assertEquals(device.interface, 'AD2-1') + + device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2')) + self.assertEquals(device.interface, 'AD2-2') + + def test_events(self): + self.assertEquals(self._attached, False) + self.assertEquals(self._detached, False) + + # this is ugly, but it works. + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event) + + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]): + USBDevice.find_all() + time.sleep(1) + USBDevice.stop_detection() + + self.assertEquals(self._attached, True) + self.assertEquals(self._detached, True) + def test_find_all(self): with patch.object(USBDevice, 'find_all', return_value=[]) as mock: devices = USBDevice.find_all()