Browse Source

Initial implementation of the overseer factory thingy.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
a8aaa5ea1e
2 changed files with 75 additions and 5 deletions
  1. +60
    -2
      pyad2usb/ad2usb.py
  2. +15
    -3
      test.py

+ 60
- 2
pyad2usb/ad2usb.py View File

@@ -12,9 +12,67 @@ class NoDeviceError(Exception):
class CommError(Exception):
pass

class AD2USB(object):
on_test = event.Event('testing')
class Overseer(object):
on_attached = event.Event('Called when an AD2USB device has been detected.')
on_detached = event.Event('Called when an AD2USB device has been removed.')

def __init__(self, attached_event=None, detached_event=None):
self._detect_thread = Overseer.DetectThread(self)

if attached_event:
self.on_attached += attached_event

if detached_event:
self.on_detached += detached_event

self.start()

def __del__(self):
pass

def start(self):
self._detect_thread.start()

def stop(self):
self._detect_thread.stop()

class DetectThread(threading.Thread):
def __init__(self, overseer):
threading.Thread.__init__(self)

self._overseer = overseer
self._running = False

def stop(self):
self._running = False

def run(self):
self._running = True

last_devices = set()

while self._running:
try:
AD2USB.find_all()

current_devices = set(AD2USB._AD2USB__devices)

new_devices = [d for d in current_devices if d not in last_devices]
removed_devices = [d for d in last_devices if d not in current_devices]
last_devices = current_devices

for d in new_devices:
self._overseer.on_attached(d)

for d in removed_devices:
self._overseer.on_detached(d)
except CommError, err:
pass

time.sleep(0.25)


class AD2USB(object):
on_open = event.Event('Called when the device has been opened')
on_close = event.Event('Called when the device has been closed')
on_read = event.Event('Called when a line has been read from the device')


+ 15
- 3
test.py View File

@@ -24,22 +24,34 @@ def handle_read(sender, args):
def handle_write(sender, args):
print 'write', args

def handle_attached(sender, args):
print 'attached', args

def handle_detached(sender, args):
print 'detached', args


signal.signal(signal.SIGINT, signal_handler)

try:
wut = pyad2usb.ad2usb.AD2USB()
overseer = pyad2usb.ad2usb.Overseer(attached_event=handle_attached, detached_event=handle_detached)
#overseer.start()

"""wut = pyad2usb.ad2usb.AD2USB()

wut.on_open += handle_open
wut.on_close += handle_close
wut.on_read += handle_read
wut.on_write += handle_write

wut.open()
wut.open()"""

while running:
time.sleep(0.1)

wut.close()
overseer.stop()

#wut.close()
except Exception, err:
print 'Error: {0}'.format(str(err))
#traceback.print_exc(err)

Loading…
Cancel
Save