A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
5.2 KiB

  1. from pyftdi.pyftdi.ftdi import *
  2. from pyftdi.pyftdi.usbtools import *
  3. import time
  4. import usb.core
  5. import usb.util
  6. from .event import event
  7. import threading
  8. class NoDeviceError(Exception):
  9. pass
  10. class CommError(Exception):
  11. pass
  12. class AD2USB(object):
  13. on_test = event.Event('testing')
  14. on_open = event.Event('Called when the device has been opened')
  15. on_close = event.Event('Called when the device has been closed')
  16. on_read = event.Event('Called when a line has been read from the device')
  17. on_write = event.Event('Called when data has been written to the device')
  18. __devices = []
  19. @classmethod
  20. def find_all(cls):
  21. cls.__devices = Device.find_all()
  22. return cls.__devices
  23. def __init__(self):
  24. self._device = None
  25. AD2USB.find_all()
  26. def __del__(self):
  27. pass
  28. def open(self, device=None):
  29. if len(self.__devices) == 0:
  30. raise NoDeviceError('No AD2USB devices present.')
  31. if device is None:
  32. device = self.__devices[0]
  33. self._device = Device(serial=device[2], description=device[4])
  34. self._wire_events()
  35. self._device.open()
  36. def close(self):
  37. self._device.close()
  38. self._device = None
  39. def _wire_events(self):
  40. self._device.on_open += self.on_open
  41. self._device.on_close += self.on_close
  42. self._device.on_read += self.on_read
  43. self._device.on_write += self.on_write
  44. class Device(object):
  45. FTDI_VENDOR_ID = 0x0403
  46. FTDI_PRODUCT_ID = 0x6001
  47. BAUDRATE = 115200
  48. on_open = event.Event('Called when the device has been opened')
  49. on_close = event.Event('Called when the device has been closed')
  50. on_read = event.Event('Called when a line has been read from the device')
  51. on_write = event.Event('Called when data has been written to the device')
  52. @staticmethod
  53. def find_all():
  54. devices = []
  55. try:
  56. devices = Ftdi.find_all([(Device.FTDI_VENDOR_ID, Device.FTDI_PRODUCT_ID)], nocache=True)
  57. except (usb.core.USBError, FtdiError), err:
  58. raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)))
  59. return devices
  60. def __init__(self, vid=FTDI_VENDOR_ID, pid=FTDI_PRODUCT_ID, serial=None, description=None):
  61. self._vendor_id = vid
  62. self._product_id = pid
  63. self._serial_number = serial
  64. self._description = description
  65. self._buffer = ''
  66. self._device = Ftdi()
  67. self._running = False
  68. self._read_thread = Device.ReadThread(self)
  69. def open(self, baudrate=BAUDRATE, interface=0, index=0):
  70. self._running = True
  71. try:
  72. self._device.open(self._vendor_id,
  73. self._product_id,
  74. interface,
  75. index,
  76. self._serial_number,
  77. self._description)
  78. self._device.set_baudrate(baudrate)
  79. except (usb.core.USBError, FtdiError), err:
  80. self.on_close()
  81. raise CommError('Error opening AD2USB device: {0}'.format(str(err)))
  82. else:
  83. self._read_thread.start()
  84. self.on_open((self._serial_number, self._description))
  85. def close(self):
  86. try:
  87. self._running = False
  88. self._read_thread.stop()
  89. self._device.close()
  90. except (FtdiError, usb.core.USBError):
  91. pass
  92. self.on_close()
  93. def write(self, data):
  94. self._device.write_data(data)
  95. self.on_write(data)
  96. def read_line(self, timeout=0.0):
  97. start_time = time.time()
  98. got_line = False
  99. ret = None
  100. try:
  101. while self._running:
  102. buf = self._device.read_data(1)
  103. self._buffer += buf
  104. if buf == "\n":
  105. if len(self._buffer) > 1:
  106. if self._buffer[-2] == "\r":
  107. self._buffer = self._buffer[:-2]
  108. # ignore if we just got \r\n with nothing else in the buffer.
  109. if len(self._buffer) != 0:
  110. got_line = True
  111. break
  112. else:
  113. self._buffer = self._buffer[:-1]
  114. if timeout > 0 and time.time() - start_time > timeout:
  115. break
  116. time.sleep(0.01)
  117. except (usb.core.USBError, FtdiError), err:
  118. self.close()
  119. raise CommError('Error reading from AD2USB device: {0}'.format(str(err)))
  120. else:
  121. if got_line:
  122. ret = self._buffer
  123. self._buffer = ''
  124. self.on_read(ret)
  125. return ret
  126. class ReadThread(threading.Thread):
  127. def __init__(self, device):
  128. threading.Thread.__init__(self)
  129. self._device = device
  130. self._running = False
  131. def stop(self):
  132. self._running = False
  133. def run(self):
  134. self._running = True
  135. while self._running:
  136. try:
  137. self._device.read_line()
  138. except CommError, err:
  139. self.stop()
  140. time.sleep(0.25)