A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
12 KiB

  1. from unittest import TestCase
  2. from mock import Mock, MagicMock, patch
  3. from serial import Serial, SerialException
  4. from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
  5. from usb.core import USBError, Device as USBCoreDevice
  6. import socket
  7. from OpenSSL import SSL, crypto
  8. from ..devices import USBDevice, SerialDevice, SocketDevice
  9. from ..util import NoDeviceError, CommError, TimeoutError
  10. class TestUSBDevice(TestCase):
  11. def setUp(self):
  12. self._device = USBDevice()
  13. self._device._device = Mock(spec=Ftdi)
  14. self._device._device.usb_dev = Mock(spec=USBCoreDevice)
  15. self._device._device.usb_dev.bus = 0
  16. self._device._device.usb_dev.address = 0
  17. def tearDown(self):
  18. self._device.close()
  19. def test_find_all(self):
  20. with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
  21. devices = USBDevice.find_all()
  22. self.assertEquals(devices, [])
  23. def test_find_all_exception(self):
  24. with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
  25. with self.assertRaises(CommError):
  26. devices = USBDevice.find_all()
  27. with self.assertRaises(CommError):
  28. devices = USBDevice.find_all()
  29. def test_interface_serial_number(self):
  30. self._device.interface = 'AD2USB'
  31. self.assertEquals(self._device.interface, 'AD2USB')
  32. self.assertEquals(self._device.serial_number, 'AD2USB')
  33. self.assertEquals(self._device._device_number, 0)
  34. def test_interface_index(self):
  35. self._device.interface = 1
  36. self.assertEquals(self._device.interface, 1)
  37. self.assertEquals(self._device.serial_number, None)
  38. self.assertEquals(self._device._device_number, 1)
  39. def test_open(self):
  40. self._device.interface = 'AD2USB'
  41. with patch.object(self._device._device, 'open') as mock:
  42. self._device.open(no_reader_thread=True)
  43. mock.assert_any_calls()
  44. def test_open_failed(self):
  45. self._device.interface = 'AD2USB'
  46. with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
  47. with self.assertRaises(NoDeviceError):
  48. self._device.open(no_reader_thread=True)
  49. with self.assertRaises(NoDeviceError):
  50. self._device.open(no_reader_thread=True)
  51. def test_write(self):
  52. self._device.interface = 'AD2USB'
  53. self._device.open(no_reader_thread=True)
  54. with patch.object(self._device._device, 'write_data') as mock:
  55. self._device.write('test')
  56. mock.assert_called_with('test')
  57. def test_write_exception(self):
  58. with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
  59. with self.assertRaises(CommError):
  60. self._device.write('test')
  61. def test_read(self):
  62. self._device.interface = 'AD2USB'
  63. self._device.open(no_reader_thread=True)
  64. with patch.object(self._device._device, 'read_data') as mock:
  65. self._device.read()
  66. mock.assert_called_with(1)
  67. def test_read_exception(self):
  68. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  69. with self.assertRaises(CommError):
  70. self._device.read()
  71. with self.assertRaises(CommError):
  72. self._device.read()
  73. def test_read_line(self):
  74. with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
  75. ret = None
  76. try:
  77. ret = self._device.read_line()
  78. except StopIteration:
  79. pass
  80. self.assertEquals(ret, "testing")
  81. def test_read_line_timeout(self):
  82. with patch.object(self._device._device, 'read_data', return_value='a') as mock:
  83. with self.assertRaises(TimeoutError):
  84. self._device.read_line(timeout=0.1)
  85. self.assertIn('a', self._device._buffer)
  86. def test_read_line_exception(self):
  87. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  88. with self.assertRaises(CommError):
  89. self._device.read_line()
  90. with self.assertRaises(CommError):
  91. self._device.read_line()
  92. class TestSerialDevice(TestCase):
  93. def setUp(self):
  94. self._device = SerialDevice()
  95. self._device._device = Mock(spec=Serial)
  96. self._device._device.open = Mock()
  97. def tearDown(self):
  98. self._device.close()
  99. def test_open(self):
  100. self._device.interface = '/dev/ttyS0'
  101. with patch.object(self._device._device, 'open') as mock:
  102. self._device.open(no_reader_thread=True)
  103. mock.assert_called_with()
  104. def test_open_no_interface(self):
  105. with self.assertRaises(NoDeviceError):
  106. self._device.open(no_reader_thread=True)
  107. self.assertFalse(self._device._running)
  108. def test_open_failed(self):
  109. self._device.interface = '/dev/ttyS0'
  110. with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]):
  111. with self.assertRaises(NoDeviceError):
  112. self._device.open(no_reader_thread=True)
  113. with self.assertRaises(NoDeviceError):
  114. self._device.open(no_reader_thread=True)
  115. def test_write(self):
  116. self._device.interface = '/dev/ttyS0'
  117. self._device.open(no_reader_thread=True)
  118. with patch.object(self._device._device, 'write') as mock:
  119. self._device.write('test')
  120. mock.assert_called_with('test')
  121. def test_write_exception(self):
  122. with patch.object(self._device._device, 'write', side_effect=SerialException):
  123. with self.assertRaises(CommError):
  124. self._device.write('test')
  125. def test_read(self):
  126. self._device.interface = '/dev/ttyS0'
  127. self._device.open(no_reader_thread=True)
  128. with patch.object(self._device._device, 'read') as mock:
  129. self._device.read()
  130. mock.assert_called_with(1)
  131. def test_read_exception(self):
  132. with patch.object(self._device._device, 'read', side_effect=SerialException):
  133. with self.assertRaises(CommError):
  134. self._device.read()
  135. def test_read_line(self):
  136. with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")):
  137. ret = None
  138. try:
  139. ret = self._device.read_line()
  140. except StopIteration:
  141. pass
  142. self.assertEquals(ret, "testing")
  143. def test_read_line_timeout(self):
  144. with patch.object(self._device._device, 'read', return_value='a') as mock:
  145. with self.assertRaises(TimeoutError):
  146. self._device.read_line(timeout=0.1)
  147. self.assertIn('a', self._device._buffer)
  148. def test_read_line_exception(self):
  149. with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
  150. with self.assertRaises(CommError):
  151. self._device.read_line()
  152. with self.assertRaises(CommError):
  153. self._device.read_line()
  154. class TestSocketDevice(TestCase):
  155. def setUp(self):
  156. self._device = SocketDevice()
  157. self._device._device = Mock(spec=socket.socket)
  158. def tearDown(self):
  159. self._device.close()
  160. def test_open(self):
  161. with patch.object(socket.socket, '__init__', return_value=None):
  162. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  163. self._device.open(no_reader_thread=True)
  164. mock.assert_called_with(self._device.interface)
  165. def test_open_no_interface(self):
  166. with self.assertRaises(NoDeviceError):
  167. self._device.open(no_reader_thread=True)
  168. self.assertFalse(self._device._running)
  169. def test_open_failed(self):
  170. with patch.object(self._device._device, 'connect', side_effect=socket.error):
  171. with self.assertRaises(NoDeviceError):
  172. self._device.open(no_reader_thread=True)
  173. def test_write(self):
  174. with patch.object(socket.socket, '__init__', return_value=None):
  175. with patch.object(socket.socket, 'connect', return_value=None):
  176. self._device.open(no_reader_thread=True)
  177. with patch.object(socket.socket, 'send') as mock:
  178. self._device.write('test')
  179. mock.assert_called_with('test')
  180. def test_write_exception(self):
  181. with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]):
  182. with self.assertRaises(CommError):
  183. self._device.write('test')
  184. def test_read(self):
  185. with patch.object(socket.socket, '__init__', return_value=None):
  186. with patch.object(socket.socket, 'connect', return_value=None):
  187. self._device.open(no_reader_thread=True)
  188. with patch.object(socket.socket, 'recv') as mock:
  189. self._device.read()
  190. mock.assert_called_with(1)
  191. def test_read_exception(self):
  192. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  193. with self.assertRaises(CommError):
  194. self._device.read()
  195. def test_read_line(self):
  196. with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")):
  197. ret = None
  198. try:
  199. ret = self._device.read_line()
  200. except StopIteration:
  201. pass
  202. self.assertEquals(ret, "testing")
  203. def test_read_line_timeout(self):
  204. with patch.object(self._device._device, 'recv', return_value='a') as mock:
  205. with self.assertRaises(TimeoutError):
  206. self._device.read_line(timeout=0.1)
  207. self.assertIn('a', self._device._buffer)
  208. def test_read_line_exception(self):
  209. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  210. with self.assertRaises(CommError):
  211. self._device.read_line()
  212. with self.assertRaises(CommError):
  213. self._device.read_line()
  214. def test_ssl(self):
  215. ssl_key = crypto.PKey()
  216. ssl_key.generate_key(crypto.TYPE_RSA, 2048)
  217. ssl_cert = crypto.X509()
  218. ssl_cert.set_pubkey(ssl_key)
  219. ssl_ca_key = crypto.PKey()
  220. ssl_ca_key.generate_key(crypto.TYPE_RSA, 2048)
  221. ssl_ca_cert = crypto.X509()
  222. ssl_ca_cert.set_pubkey(ssl_ca_key)
  223. self._device.ssl = True
  224. self._device.ssl_key = ssl_key
  225. self._device.ssl_certificate = ssl_cert
  226. self._device.ssl_ca = ssl_ca_cert
  227. # ..there has to be a better way..
  228. with patch.object(socket.socket, '__init__', return_value=None):
  229. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  230. with patch.object(socket.socket, '_sock'):
  231. with patch.object(socket.socket, 'fileno', return_value=1):
  232. self._device.open(no_reader_thread=True)
  233. mock.assert_called_with(self._device.interface)
  234. self.assertIsInstance(self._device._device, SSL.Connection)
  235. def test_ssl_exception(self):
  236. self._device.ssl = True
  237. self._device.ssl_key = 'None'
  238. self._device.ssl_certificate = 'None'
  239. self._device.ssl_ca = 'None'
  240. # ..there has to be a better way..
  241. with patch.object(socket.socket, '__init__', return_value=None):
  242. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  243. with patch.object(socket.socket, '_sock'):
  244. with patch.object(socket.socket, 'fileno', return_value=1):
  245. with self.assertRaises(CommError):
  246. self._device.open(no_reader_thread=True)