A clone of: https://github.com/nutechsoftware/alarmdecoder This is requires as they dropped support for older firmware releases w/o building in backward compatibility code, and they had previously hardcoded pyserial to a python2 only version.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

324 lines
13 KiB

  1. import time
  2. from unittest import TestCase
  3. from mock import Mock, MagicMock, patch
  4. from ..ad2 import Overseer, AD2
  5. from ..devices import USBDevice
  6. from ..messages import Message, RFMessage, LRRMessage, ExpanderMessage
  7. from ..event.event import Event, EventHandler
  8. from ..zonetracking import Zonetracker
  9. class TestOverseer(TestCase):
  10. def setUp(self):
  11. self._attached = False
  12. self._detached = False
  13. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  14. self._overseer = Overseer()
  15. def tearDown(self):
  16. self._overseer.stop()
  17. def attached_event(self, sender, args):
  18. self._attached = True
  19. def detached_event(self, sender, args):
  20. self._detached = True
  21. def test_find_all(self):
  22. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  23. devices = Overseer.find_all()
  24. self.assertEquals(devices[0][2], 'AD2')
  25. def test_create_default_param(self):
  26. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
  27. device = Overseer.create()
  28. self.assertEquals(device._device.interface, ('AD2', 0))
  29. def test_create_with_param(self):
  30. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  31. device = Overseer.create((0, 0, 'AD2-1', 1, 'AD2'))
  32. self.assertEquals(device._device.interface, ('AD2-1', 0))
  33. device = Overseer.create((0, 0, 'AD2-2', 1, 'AD2'))
  34. self.assertEquals(device._device.interface, ('AD2-2', 0))
  35. def test_events(self):
  36. self.assertEquals(self._attached, False)
  37. self.assertEquals(self._detached, False)
  38. # this is ugly, but it works.
  39. self._overseer.stop()
  40. self._overseer._detect_thread = Overseer.DetectThread(self._overseer)
  41. self._overseer.on_attached += self.attached_event
  42. self._overseer.on_detached += self.detached_event
  43. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
  44. self._overseer.start()
  45. with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
  46. Overseer.find_all()
  47. time.sleep(1)
  48. self._overseer.stop()
  49. self.assertEquals(self._attached, True)
  50. self.assertEquals(self._detached, True)
  51. class TestAD2(TestCase):
  52. def setUp(self):
  53. self._panicked = False
  54. self._relay_changed = False
  55. self._power_changed = False
  56. self._alarmed = False
  57. self._bypassed = False
  58. self._battery = (False, 0)
  59. self._fire = (False, 0)
  60. self._armed = False
  61. self._got_config = False
  62. self._message_received = False
  63. self._rfx_message_received = False
  64. self._lrr_message_received = False
  65. self._device = Mock(spec=USBDevice)
  66. self._device.on_open = EventHandler(Event(), self._device)
  67. self._device.on_close = EventHandler(Event(), self._device)
  68. self._device.on_read = EventHandler(Event(), self._device)
  69. self._device.on_write = EventHandler(Event(), self._device)
  70. self._ad2 = AD2(self._device)
  71. self._ad2._zonetracker = Mock(spec=Zonetracker)
  72. self._ad2._zonetracker.on_fault = EventHandler(Event(), self._ad2._zonetracker)
  73. self._ad2._zonetracker.on_restore = EventHandler(Event(), self._ad2._zonetracker)
  74. self._ad2.on_panic += self.on_panic
  75. self._ad2.on_relay_changed += self.on_relay_changed
  76. self._ad2.on_power_changed += self.on_power_changed
  77. self._ad2.on_alarm += self.on_alarm
  78. self._ad2.on_bypass += self.on_bypass
  79. self._ad2.on_low_battery += self.on_battery
  80. self._ad2.on_fire += self.on_fire
  81. self._ad2.on_arm += self.on_arm
  82. self._ad2.on_disarm += self.on_disarm
  83. self._ad2.on_config_received += self.on_config
  84. self._ad2.on_message += self.on_message
  85. self._ad2.on_rfx_message += self.on_rfx_message
  86. self._ad2.on_lrr_message += self.on_lrr_message
  87. self._ad2.address_mask = int('ffffffff', 16)
  88. self._ad2.open()
  89. def tearDown(self):
  90. pass
  91. def on_panic(self, sender, args):
  92. self._panicked = args
  93. def on_relay_changed(self, sender, args):
  94. self._relay_changed = True
  95. def on_power_changed(self, sender, args):
  96. self._power_changed = args
  97. def on_alarm(self, sender, args):
  98. self._alarmed = args
  99. def on_bypass(self, sender, args):
  100. self._bypassed = args
  101. def on_battery(self, sender, args):
  102. self._battery = args
  103. def on_fire(self, sender, args):
  104. self._fire = args
  105. def on_arm(self, sender, args):
  106. self._armed = True
  107. def on_disarm(self, sender, args):
  108. self._armed = False
  109. def on_config(self, sender, args):
  110. self._got_config = True
  111. def on_message(self, sender, args):
  112. self._message_received = True
  113. def on_rfx_message(self, sender, args):
  114. self._rfx_message_received = True
  115. def on_lrr_message(self, sender, args):
  116. self._lrr_message_received = True
  117. def test_open(self):
  118. self._ad2.open()
  119. self._device.open.assert_any_calls()
  120. def test_close(self):
  121. self._ad2.open()
  122. self._ad2.close()
  123. self._device.close.assert_any_calls()
  124. def test_send(self):
  125. self._ad2.send('test')
  126. self._device.write.assert_called_with('test')
  127. def test_get_config(self):
  128. self._ad2.get_config()
  129. self._device.write.assert_called_with("C\r")
  130. def test_save_config(self):
  131. self._ad2.save_config()
  132. self._device.write.assert_any_calls()
  133. def test_reboot(self):
  134. self._ad2.reboot()
  135. self._device.write.assert_called_with('=')
  136. def test_fault(self):
  137. self._ad2.fault_zone(1)
  138. self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1))
  139. def test_fault_wireproblem(self):
  140. self._ad2.fault_zone(1, simulate_wire_problem=True)
  141. self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2))
  142. def test_clear_zone(self):
  143. self._ad2.clear_zone(1)
  144. self._device.write.assert_called_with("L{0:02}0\r".format(1))
  145. def test_message(self):
  146. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  147. self.assertIsInstance(msg, Message)
  148. self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  149. self.assertTrue(self._message_received)
  150. def test_message_kpe(self):
  151. msg = self._ad2._handle_message('!KPE:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  152. self.assertIsInstance(msg, Message)
  153. self._ad2._on_read(self, '[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  154. self.assertTrue(self._message_received)
  155. def test_expander_message(self):
  156. msg = self._ad2._handle_message('!EXP:07,01,01')
  157. self.assertIsInstance(msg, ExpanderMessage)
  158. def test_relay_message(self):
  159. self._ad2.open()
  160. msg = self._ad2._handle_message('!REL:12,01,01')
  161. self.assertIsInstance(msg, ExpanderMessage)
  162. self.assertEquals(self._relay_changed, True)
  163. def test_rfx_message(self):
  164. msg = self._ad2._handle_message('!RFX:0180036,80')
  165. self.assertIsInstance(msg, RFMessage)
  166. self.assertTrue(self._rfx_message_received)
  167. def test_panic(self):
  168. self._ad2.open()
  169. msg = self._ad2._handle_message('!LRR:012,1,ALARM_PANIC')
  170. self.assertEquals(self._panicked, True)
  171. msg = self._ad2._handle_message('!LRR:012,1,CANCEL')
  172. self.assertEquals(self._panicked, False)
  173. self.assertIsInstance(msg, LRRMessage)
  174. def test_config_message(self):
  175. self._ad2.open()
  176. msg = self._ad2._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
  177. self.assertEquals(self._ad2.address, 18)
  178. self.assertEquals(self._ad2.configbits, int('ff00', 16))
  179. self.assertEquals(self._ad2.address_mask, int('ffffffff', 16))
  180. self.assertEquals(self._ad2.emulate_zone, [False for x in range(5)])
  181. self.assertEquals(self._ad2.emulate_relay, [False for x in range(4)])
  182. self.assertEquals(self._ad2.emulate_lrr, False)
  183. self.assertEquals(self._ad2.deduplicate, False)
  184. self.assertEquals(self._got_config, True)
  185. def test_power_changed_event(self):
  186. msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
  187. self.assertEquals(self._power_changed, False) # Not set first time we hit it.
  188. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  189. self.assertEquals(self._power_changed, False)
  190. msg = self._ad2._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
  191. self.assertEquals(self._power_changed, True)
  192. def test_alarm_event(self):
  193. msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
  194. self.assertEquals(self._alarmed, False) # Not set first time we hit it.
  195. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  196. self.assertEquals(self._alarmed, False)
  197. msg = self._ad2._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
  198. self.assertEquals(self._alarmed, True)
  199. def test_zone_bypassed_event(self):
  200. msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
  201. self.assertEquals(self._bypassed, False) # Not set first time we hit it.
  202. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  203. self.assertEquals(self._bypassed, False)
  204. msg = self._ad2._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
  205. self.assertEquals(self._bypassed, True)
  206. def test_armed_away_event(self):
  207. msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
  208. self.assertEquals(self._armed, False) # Not set first time we hit it.
  209. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  210. self.assertEquals(self._armed, False)
  211. msg = self._ad2._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
  212. self.assertEquals(self._armed, True)
  213. self._armed = False
  214. msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
  215. self.assertEquals(self._armed, False) # Not set first time we hit it.
  216. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  217. self.assertEquals(self._armed, False)
  218. msg = self._ad2._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
  219. self.assertEquals(self._armed, True)
  220. def test_battery_low_event(self):
  221. msg = self._ad2._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
  222. self.assertEquals(self._battery[0], True)
  223. # force the timeout to expire.
  224. with patch.object(time, 'time', return_value=self._battery[1] + 35):
  225. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  226. self.assertEquals(self._battery[0], False)
  227. def test_fire_alarm_event(self):
  228. msg = self._ad2._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
  229. self.assertEquals(self._fire[0], True)
  230. # force the timeout to expire.
  231. with patch.object(time, 'time', return_value=self._fire[1] + 35):
  232. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  233. self.assertEquals(self._fire[0], False)
  234. def test_hit_for_faults(self):
  235. self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
  236. self._ad2._device.write.assert_called_with('*')
  237. def test_zonetracker_update(self):
  238. msg = self._ad2._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
  239. self._ad2._zonetracker.update.assert_called_with(msg)