A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.2 KiB

  1. import time
  2. import threading
  3. from .event import event
  4. from . import devices
  5. from . import util
  6. class Overseer(object):
  7. on_attached = event.Event('Called when an AD2USB device has been detected.')
  8. on_detached = event.Event('Called when an AD2USB device has been removed.')
  9. __devices = []
  10. @classmethod
  11. def find_all(cls):
  12. cls.__devices = devices.USBDevice.find_all()
  13. return cls.__devices
  14. @classmethod
  15. def devices(cls):
  16. return cls.__devices
  17. @classmethod
  18. def create(cls, device=None):
  19. if len(cls.__devices) == 0:
  20. raise util.NoDeviceError('No AD2USB devices present.')
  21. if device is None:
  22. device = cls.__devices[0]
  23. vendor, product, sernum, ifcount, description = device
  24. device = devices.USBDevice(serial=sernum, description=description)
  25. return AD2USB(device)
  26. def __init__(self, attached_event=None, detached_event=None):
  27. self._detect_thread = Overseer.DetectThread(self)
  28. if attached_event:
  29. self.on_attached += attached_event
  30. if detached_event:
  31. self.on_detached += detached_event
  32. Overseer.find_all()
  33. self.start()
  34. def __del__(self):
  35. pass
  36. def close(self):
  37. self.stop()
  38. def start(self):
  39. self._detect_thread.start()
  40. def stop(self):
  41. self._detect_thread.stop()
  42. def get_device(self, device=None):
  43. return Overseer.create(device)
  44. class DetectThread(threading.Thread):
  45. def __init__(self, overseer):
  46. threading.Thread.__init__(self)
  47. self._overseer = overseer
  48. self._running = False
  49. def stop(self):
  50. self._running = False
  51. def run(self):
  52. self._running = True
  53. last_devices = set()
  54. while self._running:
  55. try:
  56. Overseer.find_all()
  57. current_devices = set(Overseer.devices())
  58. new_devices = [d for d in current_devices if d not in last_devices]
  59. removed_devices = [d for d in last_devices if d not in current_devices]
  60. last_devices = current_devices
  61. for d in new_devices:
  62. self._overseer.on_attached(d)
  63. for d in removed_devices:
  64. self._overseer.on_detached(d)
  65. except util.CommError, err:
  66. pass
  67. time.sleep(0.25)
  68. class AD2USB(object):
  69. on_open = event.Event('Called when the device has been opened')
  70. on_close = event.Event('Called when the device has been closed')
  71. on_read = event.Event('Called when a line has been read from the device')
  72. on_write = event.Event('Called when data has been written to the device')
  73. def __init__(self, device):
  74. self._device = device
  75. def __del__(self):
  76. pass
  77. def open(self):
  78. self._wire_events()
  79. self._device.open()
  80. def close(self):
  81. self._device.close()
  82. self._device = None
  83. def _wire_events(self):
  84. self._device.on_open += self.on_open
  85. self._device.on_close += self.on_close
  86. self._device.on_read += self.on_read
  87. self._device.on_write += self.on_write