A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
3.7 KiB

  1. import time
  2. import threading
  3. from .event import event
  4. from . import devices
  5. from . import util
  6. class Overseer(object):
  7. on_attached = event.Event('Called when an AD2USB device has been detected.')
  8. on_detached = event.Event('Called when an AD2USB device has been removed.')
  9. __devices = []
  10. @classmethod
  11. def find_all(cls):
  12. cls.__devices = devices.USBDevice.find_all()
  13. return cls.__devices
  14. @classmethod
  15. def devices(cls):
  16. return cls.__devices
  17. @classmethod
  18. def create(cls, device=None):
  19. cls.find_all()
  20. if len(cls.__devices) == 0:
  21. raise util.NoDeviceError('No AD2USB devices present.')
  22. if device is None:
  23. device = cls.__devices[0]
  24. vendor, product, sernum, ifcount, description = device
  25. device = devices.USBDevice(serial=sernum, description=description)
  26. return AD2USB(device)
  27. def __init__(self, attached_event=None, detached_event=None):
  28. self._detect_thread = Overseer.DetectThread(self)
  29. if attached_event:
  30. self.on_attached += attached_event
  31. if detached_event:
  32. self.on_detached += detached_event
  33. Overseer.find_all()
  34. self.start()
  35. def __del__(self):
  36. pass
  37. def close(self):
  38. self.stop()
  39. def start(self):
  40. if not self._detect_thread.is_alive():
  41. self._detect_thread.start()
  42. def stop(self):
  43. self._detect_thread.stop()
  44. def get_device(self, device=None):
  45. return Overseer.create(device)
  46. class DetectThread(threading.Thread):
  47. def __init__(self, overseer):
  48. threading.Thread.__init__(self)
  49. self._overseer = overseer
  50. self._running = False
  51. def stop(self):
  52. self._running = False
  53. def run(self):
  54. self._running = True
  55. last_devices = set()
  56. while self._running:
  57. try:
  58. Overseer.find_all()
  59. current_devices = set(Overseer.devices())
  60. new_devices = [d for d in current_devices if d not in last_devices]
  61. removed_devices = [d for d in last_devices if d not in current_devices]
  62. last_devices = current_devices
  63. for d in new_devices:
  64. self._overseer.on_attached(d)
  65. for d in removed_devices:
  66. self._overseer.on_detached(d)
  67. except util.CommError, err:
  68. pass
  69. time.sleep(0.25)
  70. class AD2USB(object):
  71. on_open = event.Event('Called when the device has been opened')
  72. on_close = event.Event('Called when the device has been closed')
  73. on_read = event.Event('Called when a line has been read from the device')
  74. on_write = event.Event('Called when data has been written to the device')
  75. def __init__(self, device):
  76. self._device = device
  77. def __del__(self):
  78. pass
  79. def open(self, baudrate=None, interface=None, index=None):
  80. self._wire_events()
  81. self._device.open(baudrate=baudrate, interface=interface, index=index)
  82. def close(self):
  83. self._device.close()
  84. self._device = None
  85. def _wire_events(self):
  86. self._device.on_open += self._on_open
  87. self._device.on_close += self._on_close
  88. self._device.on_read += self._on_read
  89. self._device.on_write += self._on_write
  90. def _on_open(self, sender, args):
  91. self.on_open(args)
  92. def _on_close(self, sender, args):
  93. self.on_close(args)
  94. def _on_read(self, sender, args):
  95. self.on_read(args)
  96. def _on_write(self, sender, args):
  97. self.on_write(args)