Browse Source

Docstrings, yay.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
8502e704fc
5 changed files with 303 additions and 8 deletions
  1. +5
    -1
      pyad2usb/__init__.py
  2. +189
    -5
      pyad2usb/ad2usb.py
  3. +81
    -0
      pyad2usb/devices.py
  4. +26
    -0
      pyad2usb/util.py
  5. +2
    -2
      test.py

+ 5
- 1
pyad2usb/__init__.py View File

@@ -1 +1,5 @@
__all__ = ['AD2USB', 'Device']
"""
The PyAD2USB module.
"""

__all__ = ['Overseer', 'AD2USB', 'USBDevice', 'SerialDevice', 'Firmware']

+ 189
- 5
pyad2usb/ad2usb.py View File

@@ -1,3 +1,7 @@
"""
Provides the full AD2USB class and factory.
"""

import time
import threading
from .event import event
@@ -5,6 +9,10 @@ from . import devices
from . import util

class Overseer(object):
"""
Factory for creation of AD2USB devices as well as provide4s attach/detach events."
"""

on_attached = event.Event('Called when an AD2USB device has been detected.')
on_detached = event.Event('Called when an AD2USB device has been removed.')

@@ -12,16 +20,25 @@ class Overseer(object):

@classmethod
def find_all(cls):
"""
Returns all AD2USB devices located on the system.
"""
cls.__devices = devices.USBDevice.find_all()

return cls.__devices

@classmethod
def devices(cls):
"""
Returns a cached list of AD2USB devices located on the system.
"""
return cls.__devices

@classmethod
def create(cls, device=None):
"""
Factory method that returns the requested AD2USB device, or the first device.
"""
cls.find_all()

if len(cls.__devices) == 0:
@@ -36,6 +53,9 @@ class Overseer(object):
return AD2USB(device)

def __init__(self, attached_event=None, detached_event=None):
"""
Constructor
"""
self._detect_thread = Overseer.DetectThread(self)

if attached_event:
@@ -49,33 +69,60 @@ class Overseer(object):
self.start()

def __del__(self):
"""
Destructor
"""
pass

def close(self):
"""
Clean up and shut down.
"""
self.stop()

def start(self):
"""
Starts the detection thread, if not already running.
"""
if not self._detect_thread.is_alive():
self._detect_thread.start()

def stop(self):
"""
Stops the detection thread.
"""
self._detect_thread.stop()

def get_device(self, device=None):
"""
Factory method that returns the requested AD2USB device, or the first device.
"""
return Overseer.create(device)


class DetectThread(threading.Thread):
"""
Thread that handles detection of added/removed devices.
"""
def __init__(self, overseer):
"""
Constructor
"""
threading.Thread.__init__(self)

self._overseer = overseer
self._running = False

def stop(self):
"""
Stops the thread.
"""
self._running = False

def run(self):
"""
The actual detection process.
"""
self._running = True

last_devices = set()
@@ -101,6 +148,10 @@ class Overseer(object):


class AD2USB(object):
"""
High-level wrapper around AD2USB/AD2SERIAL devices.
"""

on_open = event.Event('Called when the device has been opened')
on_close = event.Event('Called when the device has been closed')
on_read = event.Event('Called when a line has been read from the device')
@@ -109,26 +160,44 @@ class AD2USB(object):
on_message = event.Event('Called when a message has been received from the device.')

def __init__(self, device):
"""
Constructor
"""
self._device = device

def __del__(self):
"""
Destructor
"""
pass

def open(self, baudrate=None, interface=None, index=None):
"""
Opens the device.
"""
self._wire_events()
self._device.open(baudrate=baudrate, interface=interface, index=index)

def close(self):
"""
Closes the device.
"""
self._device.close()
self._device = None

def _wire_events(self):
"""
Wires up the internal device events.
"""
self._device.on_open += self._on_open
self._device.on_close += self._on_close
self._device.on_read += self._on_read
self._device.on_write += self._on_write

def _handle_message(self, data):
"""
Parses messages from the panel.
"""
if data[0] == '!':
return None

@@ -138,12 +207,21 @@ class AD2USB(object):
print msg.ignore_packet

def _on_open(self, sender, args):
"""
Internal handler for opening the device.
"""
self.on_open(args)

def _on_close(self, sender, args):
"""
Internal handler for closing the device.
"""
self.on_close(args)

def _on_read(self, sender, args):
"""
Internal handler for reading from the device.
"""
msg = self._handle_message(args)
if msg:
self.on_message(msg)
@@ -151,10 +229,20 @@ class AD2USB(object):
self.on_read(args)

def _on_write(self, sender, args):
"""
Internal handler for writing to the device.
"""
self.on_write(args)

class Message(object):
"""
Represents a message from the alarm panel.
"""

def __init__(self):
"""
Constructor
"""
self._ignore_packet = False
self._ready = False
self._armed_away = False
@@ -170,132 +258,228 @@ class Message(object):
self._numeric = ""
self._text = ""
self._cursor = -1
self._raw_text = ""
self._raw = ""

@property
def ignore_packet(self):
"""
Indicates whether or not this message should be ignored.
"""
return self._ignore_packet

@ignore_packet.setter
def ignore_packet(self, value):
"""
Sets the value indicating whether or not this packet should be ignored.
"""
self._ignore_packet = value

@property
def ready(self):
"""
Indicates whether or not the panel is ready.
"""
return self._ready

@ready.setter
def ready(self, value):
"""
Sets the value indicating whether or not the panel is ready.
"""
self._ready = value

@property
def armed_away(self):
"""
Indicates whether or not the panel is armed in away mode.
"""
return self._armed_away

@armed_away.setter
def armed_away(self, value):
"""
Sets the value indicating whether or not the panel is armed in away mode.
"""
self._armed_away = value

@property
def armed_home(self):
"""
Indicates whether or not the panel is armed in home/stay mode.
"""
return self._armed_home

@armed_home.setter
def armed_home(self, value):
"""
Sets the value indicating whether or not the panel is armed in home/stay mode.
"""
self._armed_home = value

@property
def backlight(self):
"""
Indicates whether or not the panel backlight is on.
"""
return self._backlight

@backlight.setter
def backlight(self, value):
"""
Sets the value indicating whether or not the panel backlight is on.
"""
self._backlight = value

@property
def programming_mode(self):
"""
Indicates whether or not the panel is in programming mode.
"""
return self._programming_mode

@programming_mode.setter
def programming_mode(self, value):
"""
Sets the value indicating whether or not the panel is in programming mode.
"""
self._programming_mode = value

@property
def beeps(self):
"""
Returns the number of beeps associated with this message.
"""
return self._beeps

@beeps.setter
def beeps(self, value):
"""
Sets the number of beeps associated with this message.
"""
self._beeps = value

@property
def bypass(self):
"""
Indicates whether or not zones have been bypassed.
"""
return self._bypass

@bypass.setter
def bypass(self, value):
"""
Sets the value indicating whether or not zones have been bypassed.
"""
self._bypass = value

@property
def ac(self):
"""
Indicates whether or not the system is on AC power.
"""
return self._ac

@ac.setter
def ac(self, value):
"""
Sets the value indicating whether or not the system is on AC power.
"""
self._ac = value

@property
def chime_mode(self):
"""
Indicates whether or not panel chimes are enabled.
"""
return self._chime_mode

@chime_mode.setter
def chime_mode(self, value):
"""
Sets the value indicating whether or not the panel chimes are enabled.
"""
self._chime_mode = value

@property
def alarm_event_occurred(self):
"""
Indicates whether or not an alarm event has occurred.
"""
return self._alarm_event_occurred

@alarm_event_occurred.setter
def alarm_event_occurred(self, value):
"""
Sets the value indicating whether or not an alarm event has occurred.
"""
self._alarm_event_occurred = value

@property
def alarm_bell(self):
"""
Indicates whether or not an alarm is currently sounding.
"""
return self._alarm_bell

@alarm_bell.setter
def alarm_bell(self, value):
"""
Sets the value indicating whether or not an alarm is currently sounding.
"""
self._alarm_bell = value

@property
def numeric(self):
"""
Numeric indicator of associated with message. For example: If zone #3 is faulted, this value is 003.
"""
return self._numeric

@numeric.setter
def numeric(self, value):
"""
Sets the numeric indicator associated with this message.
"""
self._numeric = value

@property
def text(self):
"""
Alphanumeric text associated with this message.
"""
return self._text

@text.setter
def text(self, value):
"""
Sets the alphanumeric text associated with this message.
"""
self._text = value

@property
def cursor(self):
"""
Indicates which text position has the cursor underneath it.
"""
return self._cursor

@cursor.setter
def cursor(self, value):
"""
Sets the value indicating which text position has the cursor underneath it.
"""
self._cursor = value

@property
def raw_text(self):
return self._raw_text
def raw(self):
"""
Raw representation of the message data from the panel.
"""
return self._raw

@raw_text.setter
def raw_text(self, value):
self._raw_text = value
def raw(self, value):
"""
Sets the raw representation of the message data from the panel.
"""
self._raw = value

+ 81
- 0
pyad2usb/devices.py View File

@@ -1,3 +1,7 @@
"""
Contains different types of devices belonging to the AD2USB family.
"""

import usb.core
import usb.util
import time
@@ -11,6 +15,10 @@ from . import util
from .event import event

class Device(object):
"""
Generic parent device to all AD2USB products.
"""

on_open = event.Event('Called when the device has been opened')
on_close = event.Event('Called when the device has been closed')
on_read = event.Event('Called when a line has been read from the device')
@@ -23,15 +31,28 @@ class Device(object):
pass

class ReadThread(threading.Thread):
"""
Reader thread which processes messages from the device.
"""

def __init__(self, device):
"""
Constructor
"""
threading.Thread.__init__(self)
self._device = device
self._running = False

def stop(self):
"""
Stops the running thread.
"""
self._running = False

def run(self):
"""
The actual read process.
"""
self._running = True

while self._running:
@@ -45,12 +66,20 @@ class Device(object):
time.sleep(0.01)

class USBDevice(Device):
"""
AD2USB device exposed with PyFTDI's interface.
"""

FTDI_VENDOR_ID = 0x0403
FTDI_PRODUCT_ID = 0x6001
BAUDRATE = 115200

@staticmethod
def find_all():
"""
Returns all FTDI devices matching our vendor and product IDs.
"""

devices = []

try:
@@ -61,6 +90,10 @@ class USBDevice(Device):
return devices

def __init__(self, vid=FTDI_VENDOR_ID, pid=FTDI_PRODUCT_ID, serial=None, description=None, interface=0):
"""
Constructor
"""

Device.__init__(self)

self._vendor_id = vid
@@ -75,6 +108,9 @@ class USBDevice(Device):
self._read_thread = Device.ReadThread(self)

def open(self, baudrate=BAUDRATE, interface=None, index=0):
"""
Opens the device.
"""
self._running = True

if baudrate is None:
@@ -108,6 +144,9 @@ class USBDevice(Device):
self.on_open((self._serial_number, self._description))

def close(self):
"""
Closes the device.
"""
try:
self._running = False
self._read_thread.stop()
@@ -122,17 +161,29 @@ class USBDevice(Device):
self.on_close()

def close_reader(self):
"""
Stops the reader thread.
"""
self._read_thread.stop()

def write(self, data):
"""
Writes data to the device.
"""
self._device.write_data(data)

self.on_write(data)

def read(self):
"""
Reads a single character from the device.
"""
return self._device.read_data(1)

def read_line(self, timeout=0.0):
"""
Reads a line from the device.
"""
start_time = time.time()
got_line = False
ret = None
@@ -172,10 +223,16 @@ class USBDevice(Device):


class SerialDevice(Device):
"""
AD2USB or AD2SERIAL device exposed with the pyserial interface.
"""
BAUDRATE = 19200

@staticmethod
def find_all():
"""
Returns all serial ports present.
"""
devices = []

try:
@@ -186,6 +243,9 @@ class SerialDevice(Device):
return devices

def __init__(self, interface=None):
"""
Constructor
"""
Device.__init__(self)

self._device = serial.Serial(timeout=0) # Timeout = non-blocking to match pyftdi.
@@ -195,9 +255,15 @@ class SerialDevice(Device):
self._interface = interface

def __del__(self):
"""
Destructor
"""
pass

def open(self, baudrate=BAUDRATE, interface=None, index=None):
"""
Opens the device.
"""
if baudrate is None:
baudrate = SerialDevice.BAUDRATE

@@ -224,6 +290,9 @@ class SerialDevice(Device):
self._read_thread.start()

def close(self):
"""
Closes the device.
"""
try:
self._running = False
self._read_thread.stop()
@@ -235,9 +304,15 @@ class SerialDevice(Device):
self.on_close()

def close_reader(self):
"""
Stops the reader thread.
"""
self._read_thread.stop()

def write(self, data):
"""
Writes data to the device.
"""
try:
self._device.write(data)
except serial.SerialTimeoutException, err:
@@ -246,9 +321,15 @@ class SerialDevice(Device):
self.on_write(data)

def read(self):
"""
Reads a single character from the device.
"""
return self._device.read(1)

def read_line(self, timeout=0.0):
"""
Reads a line from the device.
"""
start_time = time.time()
got_line = False
ret = None


+ 26
- 0
pyad2usb/util.py View File

@@ -1,17 +1,34 @@
"""
Provides utility classes for the AD2USB devices.
"""

import ad2usb
import time
import traceback

class NoDeviceError(Exception):
"""
No devices found.
"""
pass

class CommError(Exception):
"""
There was an error communicating with the device.
"""
pass

class TimeoutError(Exception):
"""
There was a timeout while trying to communicate with the device.
"""
pass

class Firmware(object):
"""
Represents firmware for the AD2USB/AD2SERIAL devices.
"""

STAGE_START = 0
STAGE_WAITING = 1
STAGE_BOOT = 2
@@ -20,13 +37,22 @@ class Firmware(object):
STAGE_DONE = 5

def __init__(self):
"""
Constructor
"""
pass

def __del__(self):
"""
Destructor
"""
pass

@staticmethod
def upload(dev, filename, progress_callback=None):
"""
Uploads firmware to an AD2USB/AD2SERIAL device.
"""
def do_upload():
with open(filename) as f:
for line in f:


+ 2
- 2
test.py View File

@@ -166,11 +166,11 @@ try:
#test_serial()
#upload_serial()

#test_usb()
test_usb()
#test_usb_serial()
#test_factory()
#test_factory_watcher()
upload_usb()
#upload_usb()
#upload_usb_serial()

except Exception, err:


Loading…
Cancel
Save