A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

539 lines
19 KiB

  1. from unittest import TestCase
  2. from mock import Mock, MagicMock, patch
  3. from serial import Serial, SerialException
  4. try:
  5. from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
  6. except:
  7. from pyftdi.ftdi import Ftdi, FtdiError
  8. from usb.core import USBError, Device as USBCoreDevice
  9. import socket
  10. import time
  11. import tempfile
  12. import os
  13. import select
  14. from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
  15. from alarmdecoder.util import NoDeviceError, CommError, TimeoutError
  16. class TestUSBDevice(TestCase):
  17. def setUp(self):
  18. self._device = USBDevice()
  19. self._device._device = Mock(spec=Ftdi)
  20. self._device._device.usb_dev = Mock(spec=USBCoreDevice)
  21. self._device._device.usb_dev.bus = 0
  22. self._device._device.usb_dev.address = 0
  23. self._attached = False
  24. self._detached = False
  25. def tearDown(self):
  26. self._device.close()
  27. def attached_event(self, sender, *args, **kwargs):
  28. self._attached = True
  29. def detached_event(self, sender, *args, **kwargs):
  30. self._detached = True
  31. def test_find_default_param(self):
  32. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  33. device = USBDevice.find()
  34. self.assertEqual(device.interface, 'AD2')
  35. def test_find_with_param(self):
  36. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  37. device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
  38. self.assertEqual(device.interface, 'AD2-1')
  39. device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
  40. self.assertEqual(device.interface, 'AD2-2')
  41. def test_events(self):
  42. self.assertEqual(self._attached, False)
  43. self.assertEqual(self._detached, False)
  44. # this is ugly, but it works.
  45. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  46. USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)
  47. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
  48. USBDevice.find_all()
  49. time.sleep(1)
  50. USBDevice.stop_detection()
  51. self.assertEqual(self._attached, True)
  52. self.assertEqual(self._detached, True)
  53. def test_find_all(self):
  54. with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
  55. devices = USBDevice.find_all()
  56. self.assertEqual(devices, [])
  57. def test_find_all_exception(self):
  58. with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
  59. with self.assertRaises(CommError):
  60. devices = USBDevice.find_all()
  61. with self.assertRaises(CommError):
  62. devices = USBDevice.find_all()
  63. def test_interface_serial_number(self):
  64. self._device.interface = 'AD2USB'
  65. self.assertEqual(self._device.interface, 'AD2USB')
  66. self.assertEqual(self._device.serial_number, 'AD2USB')
  67. self.assertEqual(self._device._device_number, 0)
  68. def test_interface_index(self):
  69. self._device.interface = 1
  70. self.assertEqual(self._device.interface, 1)
  71. self.assertEqual(self._device.serial_number, None)
  72. self.assertEqual(self._device._device_number, 1)
  73. def test_open(self):
  74. self._device.interface = 'AD2USB'
  75. with patch.object(self._device._device, 'open') as mock:
  76. self._device.open(no_reader_thread=True)
  77. mock.assert_any_calls()
  78. def test_open_failed(self):
  79. self._device.interface = 'AD2USB'
  80. with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
  81. with self.assertRaises(NoDeviceError):
  82. self._device.open(no_reader_thread=True)
  83. with self.assertRaises(NoDeviceError):
  84. self._device.open(no_reader_thread=True)
  85. def test_write(self):
  86. self._device.interface = 'AD2USB'
  87. self._device.open(no_reader_thread=True)
  88. with patch.object(self._device._device, 'write_data') as mock:
  89. self._device.write('test')
  90. mock.assert_called_with('test')
  91. from unittest import TestCase
  92. from mock import Mock, MagicMock, patch
  93. from serial import Serial, SerialException
  94. from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
  95. from alarmdecoder.util import NoDeviceError, CommError, TimeoutError
  96. # Optional FTDI tests
  97. try:
  98. from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
  99. from usb.core import USBError, Device as USBCoreDevice
  100. have_pyftdi = True
  101. except ImportError:
  102. have_pyftdi = False
  103. # Optional SSL tests
  104. try:
  105. from OpenSSL import SSL, crypto
  106. have_openssl = True
  107. except ImportError:
  108. have_openssl = False
  109. class TestSerialDevice(TestCase):
  110. def setUp(self):
  111. self._device = SerialDevice()
  112. self._device._device = Mock(spec=Serial)
  113. self._device._device.open = Mock()
  114. def tearDown(self):
  115. self._device.close()
  116. def test_open(self):
  117. self._device.interface = '/dev/ttyS0'
  118. with patch.object(self._device._device, 'open') as mock:
  119. self._device.open(no_reader_thread=True)
  120. mock.assert_called_with()
  121. def test_open_no_interface(self):
  122. with self.assertRaises(NoDeviceError):
  123. self._device.open(no_reader_thread=True)
  124. self.assertFalse(self._device._running)
  125. def test_open_failed(self):
  126. self._device.interface = '/dev/ttyS0'
  127. with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]):
  128. with self.assertRaises(NoDeviceError):
  129. self._device.open(no_reader_thread=True)
  130. with self.assertRaises(NoDeviceError):
  131. self._device.open(no_reader_thread=True)
  132. def test_write(self):
  133. self._device.interface = '/dev/ttyS0'
  134. self._device.open(no_reader_thread=True)
  135. with patch.object(self._device._device, 'write') as mock:
  136. self._device.write(b'test')
  137. mock.assert_called_with(b'test')
  138. def test_write_exception(self):
  139. with patch.object(self._device._device, 'write', side_effect=SerialException):
  140. with self.assertRaises(CommError):
  141. self._device.write(b'test')
  142. def test_read(self):
  143. self._device.interface = '/dev/ttyS0'
  144. self._device.open(no_reader_thread=True)
  145. with patch.object(self._device._device, 'read') as mock:
  146. self._device.read()
  147. mock.assert_called_with(1)
  148. def test_read_exception(self):
  149. with patch.object(self._device._device, 'read', side_effect=SerialException):
  150. with self.assertRaises(CommError):
  151. self._device.read()
  152. def test_read_line(self):
  153. with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")):
  154. ret = None
  155. try:
  156. ret = self._device.read_line()
  157. except StopIteration:
  158. pass
  159. self.assertEquals(ret, b"testing")
  160. def test_read_line_timeout(self):
  161. with patch.object(self._device._device, 'read', return_value='a') as mock:
  162. with self.assertRaises(TimeoutError):
  163. self._device.read_line(timeout=0.1)
  164. self.assertIn('a', self._device._buffer.decode('utf-8'))
  165. def test_read_line_exception(self):
  166. with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
  167. with self.assertRaises(CommError):
  168. self._device.read_line()
  169. with self.assertRaises(CommError):
  170. self._device.read_line()
  171. class TestSocketDevice(TestCase):
  172. def setUp(self):
  173. self._device = SocketDevice()
  174. self._device._device = Mock(spec=socket.socket)
  175. def tearDown(self):
  176. self._device.close()
  177. def test_open(self):
  178. with patch.object(socket.socket, '__init__', return_value=None):
  179. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  180. self._device.open(no_reader_thread=True)
  181. mock.assert_called_with(self._device.interface)
  182. def test_open_failed(self):
  183. with patch.object(socket.socket, 'connect', side_effect=socket.error):
  184. with self.assertRaises(NoDeviceError):
  185. self._device.open(no_reader_thread=True)
  186. def test_write(self):
  187. with patch.object(socket.socket, '__init__', return_value=None):
  188. with patch.object(socket.socket, 'connect', return_value=None):
  189. self._device.open(no_reader_thread=True)
  190. with patch.object(socket.socket, 'send') as mock:
  191. self._device.write(b'test')
  192. mock.assert_called_with(b'test')
  193. def test_write_exception(self):
  194. side_effects = [socket.error]
  195. if (have_openssl):
  196. side_effects.append(SSL.Error)
  197. with patch.object(self._device._device, 'send', side_effect=side_effects):
  198. with self.assertRaises(CommError):
  199. self._device.write(b'test')
  200. def test_read(self):
  201. with patch.object(socket.socket, '__init__', return_value=None):
  202. with patch.object(socket.socket, 'connect', return_value=None):
  203. self._device.open(no_reader_thread=True)
  204. with patch('socket.socket.fileno', return_value=1):
  205. with patch.object(select, 'select', return_value=[[1], [], []]):
  206. with patch.object(socket.socket, 'recv') as mock:
  207. self._device.read()
  208. mock.assert_called_with(1)
  209. def test_read_exception(self):
  210. with patch('socket.socket.fileno', return_value=1):
  211. with patch.object(select, 'select', return_value=[[1], [], []]):
  212. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  213. with self.assertRaises(CommError):
  214. self._device.read()
  215. def test_read_line(self):
  216. with patch('socket.socket.fileno', return_value=1):
  217. with patch.object(select, 'select', return_value=[[1], [], []]):
  218. with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")):
  219. ret = None
  220. try:
  221. ret = self._device.read_line()
  222. except StopIteration:
  223. pass
  224. self.assertEquals(ret, b"testing")
  225. def test_read_line_timeout(self):
  226. with patch('socket.socket.fileno', return_value=1):
  227. with patch.object(select, 'select', return_value=[[1], [], []]):
  228. with patch.object(self._device._device, 'recv', return_value='a') as mock:
  229. with self.assertRaises(TimeoutError):
  230. self._device.read_line(timeout=0.1)
  231. self.assertIn('a', self._device._buffer.decode('utf-8'))
  232. def test_read_line_exception(self):
  233. with patch('socket.socket.fileno', return_value=1):
  234. with patch.object(select, 'select', return_value=[[1], [], []]):
  235. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  236. with self.assertRaises(CommError):
  237. self._device.read_line()
  238. with self.assertRaises(CommError):
  239. self._device.read_line()
  240. def test_ssl(self):
  241. if not have_openssl:
  242. return
  243. ssl_key = crypto.PKey()
  244. ssl_key.generate_key(crypto.TYPE_RSA, 2048)
  245. ssl_cert = crypto.X509()
  246. ssl_cert.set_pubkey(ssl_key)
  247. ssl_ca_key = crypto.PKey()
  248. ssl_ca_key.generate_key(crypto.TYPE_RSA, 2048)
  249. ssl_ca_cert = crypto.X509()
  250. ssl_ca_cert.set_pubkey(ssl_ca_key)
  251. self._device.ssl = True
  252. self._device.ssl_key = ssl_key
  253. self._device.ssl_certificate = ssl_cert
  254. self._device.ssl_ca = ssl_ca_cert
  255. fileno, path = tempfile.mkstemp()
  256. # ..there has to be a better way..
  257. with patch.object(socket.socket, '__init__', return_value=None):
  258. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  259. with patch.object(socket.socket, '_sock'):
  260. with patch.object(socket.socket, 'fileno', return_value=fileno):
  261. try:
  262. self._device.open(no_reader_thread=True)
  263. except SSL.SysCallError as ex:
  264. pass
  265. os.close(fileno)
  266. os.unlink(path)
  267. mock.assert_called_with(self._device.interface)
  268. self.assertIsInstance(self._device._device, SSL.Connection)
  269. def test_ssl_exception(self):
  270. if not have_openssl:
  271. return
  272. self._device.ssl = True
  273. self._device.ssl_key = 'None'
  274. self._device.ssl_certificate = 'None'
  275. self._device.ssl_ca = 'None'
  276. fileno, path = tempfile.mkstemp()
  277. # ..there has to be a better way..
  278. with patch.object(socket.socket, '__init__', return_value=None):
  279. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  280. with patch.object(socket.socket, '_sock'):
  281. with patch.object(socket.socket, 'fileno', return_value=fileno):
  282. with self.assertRaises(CommError):
  283. try:
  284. self._device.open(no_reader_thread=True)
  285. except SSL.SysCallError as ex:
  286. pass
  287. os.close(fileno)
  288. os.unlink(path)
  289. if have_pyftdi:
  290. class TestUSBDevice(TestCase):
  291. def setUp(self):
  292. self._device = USBDevice()
  293. self._device._device = Mock(spec=Ftdi)
  294. self._device._device.usb_dev = Mock(spec=USBCoreDevice)
  295. self._device._device.usb_dev.bus = 0
  296. self._device._device.usb_dev.address = 0
  297. self._attached = False
  298. self._detached = False
  299. def tearDown(self):
  300. self._device.close()
  301. def attached_event(self, sender, *args, **kwargs):
  302. self._attached = True
  303. def detached_event(self, sender, *args, **kwargs):
  304. self._detached = True
  305. def test_find_default_param(self):
  306. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  307. device = USBDevice.find()
  308. self.assertEquals(device.interface, 'AD2')
  309. def test_find_with_param(self):
  310. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  311. device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
  312. self.assertEquals(device.interface, 'AD2-1')
  313. device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
  314. self.assertEquals(device.interface, 'AD2-2')
  315. def test_events(self):
  316. self.assertEquals(self._attached, False)
  317. self.assertEquals(self._detached, False)
  318. # this is ugly, but it works.
  319. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  320. USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)
  321. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
  322. USBDevice.find_all()
  323. time.sleep(1)
  324. USBDevice.stop_detection()
  325. self.assertEquals(self._attached, True)
  326. self.assertEquals(self._detached, True)
  327. def test_find_all(self):
  328. with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
  329. devices = USBDevice.find_all()
  330. self.assertEquals(devices, [])
  331. def test_find_all_exception(self):
  332. with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
  333. with self.assertRaises(CommError):
  334. devices = USBDevice.find_all()
  335. with self.assertRaises(CommError):
  336. devices = USBDevice.find_all()
  337. def test_interface_serial_number(self):
  338. self._device.interface = 'AD2USB'
  339. self.assertEquals(self._device.interface, 'AD2USB')
  340. self.assertEquals(self._device.serial_number, 'AD2USB')
  341. self.assertEquals(self._device._device_number, 0)
  342. def test_interface_index(self):
  343. self._device.interface = 1
  344. self.assertEquals(self._device.interface, 1)
  345. self.assertEquals(self._device.serial_number, None)
  346. self.assertEquals(self._device._device_number, 1)
  347. def test_open(self):
  348. self._device.interface = 'AD2USB'
  349. with patch.object(self._device._device, 'open') as mock:
  350. self._device.open(no_reader_thread=True)
  351. mock.assert_any_calls()
  352. def test_open_failed(self):
  353. self._device.interface = 'AD2USB'
  354. with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
  355. with self.assertRaises(NoDeviceError):
  356. self._device.open(no_reader_thread=True)
  357. with self.assertRaises(NoDeviceError):
  358. self._device.open(no_reader_thread=True)
  359. def test_write(self):
  360. self._device.interface = 'AD2USB'
  361. self._device.open(no_reader_thread=True)
  362. with patch.object(self._device._device, 'write_data') as mock:
  363. self._device.write(b'test')
  364. mock.assert_called_with(b'test')
  365. def test_write_exception(self):
  366. with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
  367. with self.assertRaises(CommError):
  368. self._device.write(b'test')
  369. def test_read(self):
  370. self._device.interface = 'AD2USB'
  371. self._device.open(no_reader_thread=True)
  372. with patch.object(self._device._device, 'read_data') as mock:
  373. self._device.read()
  374. mock.assert_called_with(1)
  375. def test_read_exception(self):
  376. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  377. with self.assertRaises(CommError):
  378. self._device.read()
  379. with self.assertRaises(CommError):
  380. self._device.read()
  381. def test_read_line(self):
  382. with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
  383. ret = None
  384. try:
  385. ret = self._device.read_line()
  386. except StopIteration:
  387. pass
  388. self.assertEquals(ret, b"testing")
  389. def test_read_line_timeout(self):
  390. with patch.object(self._device._device, 'read_data', return_value='a') as mock:
  391. with self.assertRaises(TimeoutError):
  392. self._device.read_line(timeout=0.1)
  393. self.assertIn('a', self._device._buffer)
  394. def test_read_line_exception(self):
  395. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  396. with self.assertRaises(CommError):
  397. self._device.read_line()
  398. with self.assertRaises(CommError):
  399. self._device.read_line()