A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
20 KiB

  1. from unittest import TestCase
  2. from mock import Mock, MagicMock, patch
  3. from serial import Serial, SerialException
  4. try:
  5. from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
  6. except:
  7. from pyftdi.ftdi import Ftdi, FtdiError
  8. from usb.core import USBError, Device as USBCoreDevice
  9. import sys
  10. import socket
  11. import time
  12. import tempfile
  13. import os
  14. import select
  15. from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
  16. from alarmdecoder.util import NoDeviceError, CommError, TimeoutError
  17. # Optional FTDI tests
  18. try:
  19. from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
  20. from usb.core import USBError, Device as USBCoreDevice
  21. have_pyftdi = True
  22. except ImportError:
  23. have_pyftdi = False
  24. # Optional SSL tests
  25. try:
  26. from OpenSSL import SSL, crypto
  27. have_openssl = True
  28. except ImportError:
  29. have_openssl = False
  30. class TestUSBDevice(TestCase):
  31. def setUp(self):
  32. self._device = USBDevice()
  33. self._device._device = Mock(spec=Ftdi)
  34. self._device._device.usb_dev = Mock(spec=USBCoreDevice)
  35. self._device._device.usb_dev.bus = 0
  36. self._device._device.usb_dev.address = 0
  37. self._attached = False
  38. self._detached = False
  39. def tearDown(self):
  40. self._device.close()
  41. ### Library events
  42. def attached_event(self, sender, *args, **kwargs):
  43. self._attached = True
  44. def detached_event(self, sender, *args, **kwargs):
  45. self._detached = True
  46. ### Tests
  47. def test_find_default_param(self):
  48. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  49. device = USBDevice.find()
  50. self.assertEqual(device.interface, 'AD2')
  51. def test_find_with_param(self):
  52. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  53. device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
  54. self.assertEqual(device.interface, 'AD2-1')
  55. device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
  56. self.assertEqual(device.interface, 'AD2-2')
  57. def test_events(self):
  58. self.assertFalse(self._attached)
  59. self.assertFalse(self._detached)
  60. # this is ugly, but it works.
  61. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  62. USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)
  63. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
  64. USBDevice.find_all()
  65. time.sleep(1)
  66. USBDevice.stop_detection()
  67. self.assertTrue(self._attached)
  68. self.assertTrue(self._detached)
  69. def test_find_all(self):
  70. with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
  71. devices = USBDevice.find_all()
  72. self.assertEqual(devices, [])
  73. def test_find_all_exception(self):
  74. with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
  75. with self.assertRaises(CommError):
  76. devices = USBDevice.find_all()
  77. with self.assertRaises(CommError):
  78. devices = USBDevice.find_all()
  79. def test_interface_serial_number(self):
  80. self._device.interface = 'AD2USB'
  81. self.assertEqual(self._device.interface, 'AD2USB')
  82. self.assertEqual(self._device.serial_number, 'AD2USB')
  83. self.assertEqual(self._device._device_number, 0)
  84. def test_interface_index(self):
  85. self._device.interface = 1
  86. self.assertEqual(self._device.interface, 1)
  87. self.assertEqual(self._device.serial_number, None)
  88. self.assertEqual(self._device._device_number, 1)
  89. def test_open(self):
  90. self._device.interface = 'AD2USB'
  91. with patch.object(self._device._device, 'open') as mock:
  92. self._device.open(no_reader_thread=True)
  93. mock.assert_any_calls()
  94. def test_open_failed(self):
  95. self._device.interface = 'AD2USB'
  96. with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
  97. with self.assertRaises(NoDeviceError):
  98. self._device.open(no_reader_thread=True)
  99. with self.assertRaises(NoDeviceError):
  100. self._device.open(no_reader_thread=True)
  101. def test_write(self):
  102. self._device.interface = 'AD2USB'
  103. self._device.open(no_reader_thread=True)
  104. with patch.object(self._device._device, 'write_data') as mock:
  105. self._device.write('test')
  106. mock.assert_called_with('test')
  107. class TestSerialDevice(TestCase):
  108. def setUp(self):
  109. self._device = SerialDevice()
  110. self._device._device = Mock(spec=Serial)
  111. self._device._device.open = Mock()
  112. def tearDown(self):
  113. self._device.close()
  114. ### Tests
  115. def test_open(self):
  116. self._device.interface = '/dev/ttyS0'
  117. with patch.object(self._device._device, 'open') as mock:
  118. self._device.open(no_reader_thread=True)
  119. mock.assert_called_with()
  120. def test_open_no_interface(self):
  121. with self.assertRaises(NoDeviceError):
  122. self._device.open(no_reader_thread=True)
  123. self.assertFalse(self._device._running)
  124. def test_open_failed(self):
  125. self._device.interface = '/dev/ttyS0'
  126. with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]):
  127. with self.assertRaises(NoDeviceError):
  128. self._device.open(no_reader_thread=True)
  129. with self.assertRaises(NoDeviceError):
  130. self._device.open(no_reader_thread=True)
  131. def test_write(self):
  132. self._device.interface = '/dev/ttyS0'
  133. self._device.open(no_reader_thread=True)
  134. with patch.object(self._device._device, 'write') as mock:
  135. self._device.write(b'test')
  136. mock.assert_called_with(b'test')
  137. def test_write_exception(self):
  138. with patch.object(self._device._device, 'write', side_effect=SerialException):
  139. with self.assertRaises(CommError):
  140. self._device.write(b'test')
  141. def test_read(self):
  142. self._device.interface = '/dev/ttyS0'
  143. self._device.open(no_reader_thread=True)
  144. with patch.object(self._device._device, 'read') as mock:
  145. with patch('serial.Serial.fileno', return_value=1):
  146. with patch.object(select, 'select', return_value=[[1], [], []]):
  147. ret = self._device.read()
  148. mock.assert_called_with(1)
  149. def test_read_exception(self):
  150. with patch.object(self._device._device, 'read', side_effect=SerialException):
  151. with patch('serial.Serial.fileno', return_value=1):
  152. with patch.object(select, 'select', return_value=[[1], [], []]):
  153. with self.assertRaises(CommError):
  154. self._device.read()
  155. def test_read_line(self):
  156. side_effect = list("testing\r\n")
  157. if sys.version_info > (3,):
  158. side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"]
  159. with patch.object(self._device._device, 'read', side_effect=side_effect):
  160. with patch('serial.Serial.fileno', return_value=1):
  161. with patch.object(select, 'select', return_value=[[1], [], []]):
  162. ret = None
  163. try:
  164. ret = self._device.read_line()
  165. except StopIteration:
  166. pass
  167. self.assertEquals(ret, "testing")
  168. def test_read_line_timeout(self):
  169. with patch.object(self._device._device, 'read', return_value=b'a') as mock:
  170. with patch('serial.Serial.fileno', return_value=1):
  171. with patch.object(select, 'select', return_value=[[1], [], []]):
  172. with self.assertRaises(TimeoutError):
  173. self._device.read_line(timeout=0.1)
  174. self.assertIn('a', self._device._buffer.decode('utf-8'))
  175. def test_read_line_exception(self):
  176. with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
  177. with patch('serial.Serial.fileno', return_value=1):
  178. with patch.object(select, 'select', return_value=[[1], [], []]):
  179. with self.assertRaises(CommError):
  180. self._device.read_line()
  181. with self.assertRaises(CommError):
  182. self._device.read_line()
  183. class TestSocketDevice(TestCase):
  184. def setUp(self):
  185. self._device = SocketDevice()
  186. self._device._device = Mock(spec=socket.socket)
  187. def tearDown(self):
  188. self._device.close()
  189. ### Tests
  190. def test_open(self):
  191. with patch.object(socket.socket, '__init__', return_value=None):
  192. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  193. self._device.open(no_reader_thread=True)
  194. mock.assert_called_with(self._device.interface)
  195. def test_open_failed(self):
  196. with patch.object(socket.socket, 'connect', side_effect=socket.error):
  197. with self.assertRaises(NoDeviceError):
  198. self._device.open(no_reader_thread=True)
  199. def test_write(self):
  200. with patch.object(socket.socket, '__init__', return_value=None):
  201. with patch.object(socket.socket, 'connect', return_value=None):
  202. self._device.open(no_reader_thread=True)
  203. with patch.object(socket.socket, 'send') as mock:
  204. self._device.write(b'test')
  205. mock.assert_called_with(b'test')
  206. def test_write_exception(self):
  207. side_effects = [socket.error]
  208. if (have_openssl):
  209. side_effects.append(SSL.Error)
  210. with patch.object(self._device._device, 'send', side_effect=side_effects):
  211. with self.assertRaises(CommError):
  212. self._device.write(b'test')
  213. def test_read(self):
  214. with patch.object(socket.socket, '__init__', return_value=None):
  215. with patch.object(socket.socket, 'connect', return_value=None):
  216. self._device.open(no_reader_thread=True)
  217. with patch('socket.socket.fileno', return_value=1):
  218. with patch.object(select, 'select', return_value=[[1], [], []]):
  219. with patch.object(socket.socket, 'recv') as mock:
  220. self._device.read()
  221. mock.assert_called_with(1)
  222. def test_read_exception(self):
  223. with patch('socket.socket.fileno', return_value=1):
  224. with patch.object(select, 'select', return_value=[[1], [], []]):
  225. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  226. with self.assertRaises(CommError):
  227. self._device.read()
  228. def test_read_line(self):
  229. side_effect = list("testing\r\n")
  230. if sys.version_info > (3,):
  231. side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"]
  232. with patch('socket.socket.fileno', return_value=1):
  233. with patch.object(select, 'select', return_value=[[1], [], []]):
  234. with patch.object(self._device._device, 'recv', side_effect=side_effect):
  235. ret = None
  236. try:
  237. ret = self._device.read_line()
  238. except StopIteration:
  239. pass
  240. self.assertEquals(ret, "testing")
  241. def test_read_line_timeout(self):
  242. with patch('socket.socket.fileno', return_value=1):
  243. with patch.object(select, 'select', return_value=[[1], [], []]):
  244. with patch.object(self._device._device, 'recv', return_value=b'a') as mock:
  245. with self.assertRaises(TimeoutError):
  246. self._device.read_line(timeout=0.1)
  247. self.assertIn('a', self._device._buffer.decode('utf-8'))
  248. def test_read_line_exception(self):
  249. with patch('socket.socket.fileno', return_value=1):
  250. with patch.object(select, 'select', return_value=[[1], [], []]):
  251. with patch.object(self._device._device, 'recv', side_effect=socket.error):
  252. with self.assertRaises(CommError):
  253. self._device.read_line()
  254. with self.assertRaises(CommError):
  255. self._device.read_line()
  256. def test_ssl(self):
  257. if not have_openssl:
  258. return
  259. ssl_key = crypto.PKey()
  260. ssl_key.generate_key(crypto.TYPE_RSA, 2048)
  261. ssl_cert = crypto.X509()
  262. ssl_cert.set_pubkey(ssl_key)
  263. ssl_ca_key = crypto.PKey()
  264. ssl_ca_key.generate_key(crypto.TYPE_RSA, 2048)
  265. ssl_ca_cert = crypto.X509()
  266. ssl_ca_cert.set_pubkey(ssl_ca_key)
  267. self._device.ssl = True
  268. self._device.ssl_key = ssl_key
  269. self._device.ssl_certificate = ssl_cert
  270. self._device.ssl_ca = ssl_ca_cert
  271. fileno, path = tempfile.mkstemp()
  272. # ..there has to be a better way..
  273. with patch.object(socket.socket, '__init__', return_value=None):
  274. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  275. with patch.object(socket.socket, '_sock'):
  276. with patch.object(socket.socket, 'fileno', return_value=fileno):
  277. try:
  278. self._device.open(no_reader_thread=True)
  279. except SSL.SysCallError as ex:
  280. pass
  281. os.close(fileno)
  282. os.unlink(path)
  283. mock.assert_called_with(self._device.interface)
  284. self.assertIsInstance(self._device._device, SSL.Connection)
  285. def test_ssl_exception(self):
  286. if not have_openssl:
  287. return
  288. self._device.ssl = True
  289. self._device.ssl_key = 'None'
  290. self._device.ssl_certificate = 'None'
  291. self._device.ssl_ca = 'None'
  292. fileno, path = tempfile.mkstemp()
  293. # ..there has to be a better way..
  294. with patch.object(socket.socket, '__init__', return_value=None):
  295. with patch.object(socket.socket, 'connect', return_value=None) as mock:
  296. with patch.object(socket.socket, '_sock'):
  297. with patch.object(socket.socket, 'fileno', return_value=fileno):
  298. with self.assertRaises(CommError):
  299. try:
  300. self._device.open(no_reader_thread=True)
  301. except SSL.SysCallError as ex:
  302. pass
  303. os.close(fileno)
  304. os.unlink(path)
  305. if have_pyftdi:
  306. class TestUSBDevice(TestCase):
  307. def setUp(self):
  308. self._device = USBDevice()
  309. self._device._device = Mock(spec=Ftdi)
  310. self._device._device.usb_dev = Mock(spec=USBCoreDevice)
  311. self._device._device.usb_dev.bus = 0
  312. self._device._device.usb_dev.address = 0
  313. self._attached = False
  314. self._detached = False
  315. def tearDown(self):
  316. self._device.close()
  317. ### Library events
  318. def attached_event(self, sender, *args, **kwargs):
  319. self._attached = True
  320. def detached_event(self, sender, *args, **kwargs):
  321. self._detached = True
  322. ### Tests
  323. def test_find_default_param(self):
  324. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  325. device = USBDevice.find()
  326. self.assertEquals(device.interface, 'AD2')
  327. def test_find_with_param(self):
  328. with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  329. device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
  330. self.assertEquals(device.interface, 'AD2-1')
  331. device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
  332. self.assertEquals(device.interface, 'AD2-2')
  333. def test_events(self):
  334. self.assertFalse(self._attached)
  335. self.assertFalse(self._detached)
  336. # this is ugly, but it works.
  337. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  338. USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)
  339. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
  340. USBDevice.find_all()
  341. time.sleep(1)
  342. USBDevice.stop_detection()
  343. self.assertTrue(self._attached)
  344. self.assertTrue(self._detached)
  345. def test_find_all(self):
  346. with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
  347. devices = USBDevice.find_all()
  348. self.assertEquals(devices, [])
  349. def test_find_all_exception(self):
  350. with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
  351. with self.assertRaises(CommError):
  352. devices = USBDevice.find_all()
  353. with self.assertRaises(CommError):
  354. devices = USBDevice.find_all()
  355. def test_interface_serial_number(self):
  356. self._device.interface = 'AD2USB'
  357. self.assertEquals(self._device.interface, 'AD2USB')
  358. self.assertEquals(self._device.serial_number, 'AD2USB')
  359. self.assertEquals(self._device._device_number, 0)
  360. def test_interface_index(self):
  361. self._device.interface = 1
  362. self.assertEquals(self._device.interface, 1)
  363. self.assertEquals(self._device.serial_number, None)
  364. self.assertEquals(self._device._device_number, 1)
  365. def test_open(self):
  366. self._device.interface = 'AD2USB'
  367. with patch.object(self._device._device, 'open') as mock:
  368. self._device.open(no_reader_thread=True)
  369. mock.assert_any_calls()
  370. def test_open_failed(self):
  371. self._device.interface = 'AD2USB'
  372. with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
  373. with self.assertRaises(NoDeviceError):
  374. self._device.open(no_reader_thread=True)
  375. with self.assertRaises(NoDeviceError):
  376. self._device.open(no_reader_thread=True)
  377. def test_write(self):
  378. self._device.interface = 'AD2USB'
  379. self._device.open(no_reader_thread=True)
  380. with patch.object(self._device._device, 'write_data') as mock:
  381. self._device.write(b'test')
  382. mock.assert_called_with(b'test')
  383. def test_write_exception(self):
  384. with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
  385. with self.assertRaises(CommError):
  386. self._device.write(b'test')
  387. def test_read(self):
  388. self._device.interface = 'AD2USB'
  389. self._device.open(no_reader_thread=True)
  390. with patch.object(self._device._device, 'read_data') as mock:
  391. self._device.read()
  392. mock.assert_called_with(1)
  393. def test_read_exception(self):
  394. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  395. with self.assertRaises(CommError):
  396. self._device.read()
  397. with self.assertRaises(CommError):
  398. self._device.read()
  399. def test_read_line(self):
  400. with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
  401. ret = None
  402. try:
  403. ret = self._device.read_line()
  404. except StopIteration:
  405. pass
  406. self.assertEquals(ret, b"testing")
  407. def test_read_line_timeout(self):
  408. with patch.object(self._device._device, 'read_data', return_value='a') as mock:
  409. with self.assertRaises(TimeoutError):
  410. self._device.read_line(timeout=0.1)
  411. self.assertIn('a', self._device._buffer)
  412. def test_read_line_exception(self):
  413. with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
  414. with self.assertRaises(CommError):
  415. self._device.read_line()
  416. with self.assertRaises(CommError):
  417. self._device.read_line()