Browse Source

Merge branch 'overseer-rework' into release-prep

pyserial_fix
Scott Petersen 11 years ago
parent
commit
aa67728616
5 changed files with 177 additions and 223 deletions
  1. +1
    -1
      pyad2/__init__.py
  2. +0
    -157
      pyad2/ad2.py
  3. +135
    -6
      pyad2/devices.py
  4. +1
    -59
      pyad2/tests/test_ad2.py
  5. +40
    -0
      pyad2/tests/test_devices.py

+ 1
- 1
pyad2/__init__.py View File

@@ -1,4 +1,4 @@
import ad2
from ad2 import AD2
import devices
import util
import messages


+ 0
- 157
pyad2/ad2.py View File

@@ -8,167 +8,10 @@ import time
import threading

from .event import event
from .devices import USBDevice
from .util import CommError, NoDeviceError
from .messages import Message, ExpanderMessage, RFMessage, LRRMessage
from .zonetracking import Zonetracker

class AD2Factory(object):
"""
Factory for creation of AD2USB devices as well as provides attach/detach events."
"""

# Factory events
on_attached = event.Event('Called when an AD2USB device has been detected.')
on_detached = event.Event('Called when an AD2USB device has been removed.')

__devices = []

@classmethod
def find_all(cls):
"""
Returns all AD2USB devices located on the system.

:returns: list of devices found
:raises: CommError
"""
cls.__devices = USBDevice.find_all()

return cls.__devices

@classmethod
def devices(cls):
"""
Returns a cached list of AD2USB devices located on the system.

:returns: cached list of devices found.
"""
return cls.__devices

@classmethod
def create(cls, device=None):
"""
Factory method that returns the requested AD2USB device, or the first device.

:param device: Tuple describing the USB device to open, as returned by find_all().
:type device: tuple

:returns: AD2USB object utilizing the specified device.
:raises: NoDeviceError
"""
cls.find_all()

if len(cls.__devices) == 0:
raise NoDeviceError('No AD2USB devices present.')

if device is None:
device = cls.__devices[0]

vendor, product, sernum, ifcount, description = device
device = USBDevice(interface=sernum)

return AD2(device)

def __init__(self, attached_event=None, detached_event=None):
"""
Constructor

:param attached_event: Event to trigger when a device is attached.
:type attached_event: function
:param detached_event: Event to trigger when a device is detached.
:type detached_event: function
"""
self._detect_thread = AD2Factory.DetectThread(self)

if attached_event:
self.on_attached += attached_event

if detached_event:
self.on_detached += detached_event

AD2Factory.find_all()

self.start()

def close(self):
"""
Clean up and shut down.
"""
self.stop()

def start(self):
"""
Starts the detection thread, if not already running.
"""
if not self._detect_thread.is_alive():
self._detect_thread.start()

def stop(self):
"""
Stops the detection thread.
"""
self._detect_thread.stop()

def get_device(self, device=None):
"""
Factory method that returns the requested AD2USB device, or the first device.

:param device: Tuple describing the USB device to open, as returned by find_all().
:type device: tuple
"""
return AD2Factory.create(device)

class DetectThread(threading.Thread):
"""
Thread that handles detection of added/removed devices.
"""
def __init__(self, factory):
"""
Constructor

:param factory: AD2Factory object to use with the thread.
:type factory: AD2Factory
"""
threading.Thread.__init__(self)

self._factory = factory
self._running = False

def stop(self):
"""
Stops the thread.
"""
self._running = False

def run(self):
"""
The actual detection process.
"""
self._running = True

last_devices = set()

while self._running:
try:
AD2Factory.find_all()

current_devices = set(AD2Factory.devices())
new_devices = [d for d in current_devices if d not in last_devices]
removed_devices = [d for d in last_devices if d not in current_devices]
last_devices = current_devices

for d in new_devices:
self._factory.on_attached(device=d)

for d in removed_devices:
self._factory.on_detached(device=d)

except CommError, err:
pass

time.sleep(0.25)


class AD2(object):
"""
High-level wrapper around AD2 devices.


+ 135
- 6
pyad2/devices.py View File

@@ -142,23 +142,84 @@ class USBDevice(Device):
BAUDRATE = 115200
"""Default baudrate for AD2USB devices."""

@staticmethod
def find_all():
__devices = []

@classmethod
def find_all(cls, vid=FTDI_VENDOR_ID, pid=FTDI_PRODUCT_ID):
"""
Returns all FTDI devices matching our vendor and product IDs.

:returns: list of devices
:raises: CommError
"""
devices = []
cls.__devices = []

try:
devices = Ftdi.find_all([(USBDevice.FTDI_VENDOR_ID, USBDevice.FTDI_PRODUCT_ID)], nocache=True)
cls.__devices = Ftdi.find_all([(vid, pid)], nocache=True)

except (usb.core.USBError, FtdiError), err:
raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err)

return devices
return cls.__devices

@classmethod
def devices(cls):
"""
Returns a cached list of AD2USB devices located on the system.

:returns: cached list of devices found.
"""
return cls.__devices

@classmethod
def find(cls, device=None):
"""
Factory method that returns the requested USBDevice device, or the first device.

:param device: Tuple describing the USB device to open, as returned by find_all().
:type device: tuple

:returns: USBDevice object utilizing the specified device.
:raises: NoDeviceError
"""
cls.find_all()

if len(cls.__devices) == 0:
raise NoDeviceError('No AD2USB devices present.')

if device is None:
device = cls.__devices[0]

vendor, product, sernum, ifcount, description = device

return USBDevice(interface=sernum)

@classmethod
def start_detection(cls, on_attached=None, on_detached=None):
"""
Starts the device detection thread.

:param on_attached: function to be called when a device is attached.
:type on_attached: function
:param on_detached: function to be called when a device is detached.
:type on_detached: function
"""
cls.__detect_thread = USBDevice.DetectThread(on_attached, on_detached)

cls.find_all()

cls.__detect_thread.start()

@classmethod
def stop_detection(cls):
"""
Stops the device detection thread.
"""
try:
cls.__detect_thread.stop()

except:
pass

@property
def interface(self):
@@ -267,7 +328,10 @@ class USBDevice(Device):

self._device.set_baudrate(baudrate)

self._id = 'USB {0}:{1}'.format(self._device.usb_dev.bus, self._device.usb_dev.address)
if not self._serial_number:
self._serial_number = self._get_serial_number()

self._id = self._serial_number

except (usb.core.USBError, FtdiError), err:
raise NoDeviceError('Error opening device: {0}'.format(str(err)), err)
@@ -395,6 +459,71 @@ class USBDevice(Device):

return ret

def _get_serial_number(self):
"""
Retrieves the FTDI device serial number.

:returns: string containing the device serial number.
"""
return usb.util.get_string(self._device.usb_dev, 64, self._device.usb_dev.iSerialNumber)

class DetectThread(threading.Thread):
"""
Thread that handles detection of added/removed devices.
"""
on_attached = event.Event('Called when an AD2USB device has been detected.')
on_detached = event.Event('Called when an AD2USB device has been removed.')

def __init__(self, on_attached=None, on_detached=None):
"""
Constructor

:param factory: AD2Factory object to use with the thread.
:type factory: AD2Factory
"""
threading.Thread.__init__(self)

if on_attached:
self.on_attached += on_attached

if on_detached:
self.on_detached += on_detached

self._running = False

def stop(self):
"""
Stops the thread.
"""
self._running = False

def run(self):
"""
The actual detection process.
"""
self._running = True

last_devices = set()

while self._running:
try:
current_devices = set(USBDevice.find_all())

new_devices = [d for d in current_devices if d not in last_devices]
removed_devices = [d for d in last_devices if d not in current_devices]
last_devices = current_devices

for d in new_devices:
self.on_attached(device=d)

for d in removed_devices:
self.on_detached(device=d)

except CommError, err:
pass

time.sleep(0.25)


class SerialDevice(Device):
"""


+ 1
- 59
pyad2/tests/test_ad2.py View File

@@ -3,70 +3,12 @@ import time
from unittest import TestCase
from mock import Mock, MagicMock, patch

from ..ad2 import AD2Factory, AD2
from ..ad2 import AD2
from ..devices import USBDevice
from ..messages import Message, RFMessage, LRRMessage, ExpanderMessage
from ..event.event import Event, EventHandler
from ..zonetracking import Zonetracker

class TestAD2Factory(TestCase):
def setUp(self):
self._attached = False
self._detached = False

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
self._factory = AD2Factory()

def tearDown(self):
self._factory.stop()

def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
devices = AD2Factory.find_all()

self.assertEquals(devices[0][2], 'AD2')

def test_create_default_param(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = AD2Factory.create()

self.assertEquals(device._device.interface, 'AD2')

def test_create_with_param(self):
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = AD2Factory.create((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device._device.interface, 'AD2-1')

device = AD2Factory.create((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device._device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
self._factory.stop()
self._factory._detect_thread = AD2Factory.DetectThread(self._factory)
self._factory.on_attached += self.attached_event
self._factory.on_detached += self.detached_event

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
self._factory.start()

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
AD2Factory.find_all()
time.sleep(1)
self._factory.stop()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

class TestAD2(TestCase):
def setUp(self):
self._panicked = False


+ 40
- 0
pyad2/tests/test_devices.py View File

@@ -4,6 +4,7 @@ from serial import Serial, SerialException
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice
import socket
import time
from OpenSSL import SSL, crypto
from ..devices import USBDevice, SerialDevice, SocketDevice
from ..util import NoDeviceError, CommError, TimeoutError
@@ -17,9 +18,48 @@ class TestUSBDevice(TestCase):
self._device._device.usb_dev.bus = 0
self._device._device.usb_dev.address = 0

self._attached = False
self._detached = False

def tearDown(self):
self._device.close()

def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()

self.assertEquals(device.interface, 'AD2')

def test_find_with_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-1')

device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.find_all()
time.sleep(1)
USBDevice.stop_detection()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all()


Loading…
Cancel
Save