Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.

lora.py 5.9 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import asyncio
  2. import functools
  3. import os
  4. import unittest
  5. from Strobe.Strobe import Strobe
  6. from Strobe.Strobe import AuthenticationFailed
  7. domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  8. # Response to command will be the CMD and any arguments if needed.
  9. # The command is encoded as an unsigned byte
  10. CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
  11. # The follow commands are queue up, but will be acknoledged when queued
  12. CMD_WAITFOR = 2 # arg: (length): waits for length seconds
  13. CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
  14. class LORANode(object):
  15. '''Implement a LORANode initiator.'''
  16. def __init__(self, syncdatagram):
  17. self.sd = syncdatagram
  18. self.st = Strobe(domain)
  19. async def start(self):
  20. msg = self.st.send_enc(os.urandom(16) + b'reqreset') + \
  21. self.st.send_mac(8)
  22. resp = await self.sd.sendtillrecv(msg, 1)
  23. self.st.recv_enc(resp[:16])
  24. self.st.recv_mac(resp[16:])
  25. resp = await self.sd.sendtillrecv(
  26. self.st.send_enc(b'confirm') + self.st.send_mac(8), 1)
  27. pkt = self.st.recv_enc(resp[:9])
  28. self.st.recv_mac(resp[9:])
  29. if pkt != b'confirmed':
  30. raise RuntimeError
  31. @staticmethod
  32. def _encodeargs(*args):
  33. r = []
  34. for i in args:
  35. r.append(i.to_bytes(4, byteorder='little'))
  36. return b''.join(r)
  37. async def _sendcmd(self, cmd, *args):
  38. cmdbyte = cmd.to_bytes(1, byteorder='little')
  39. pkt = await self.sd.sendtillrecv(
  40. self.st.send_enc(cmdbyte +
  41. self._encodeargs(*args)) + self.st.send_mac(8), 1)
  42. resp = self.st.recv_enc(pkt[:-8])
  43. self.st.recv_mac(pkt[-8:])
  44. if resp[0:1] != cmdbyte:
  45. raise RuntimeError('response does not match, got: %s, expected: %s' % (repr(resp[0:1]), repr(cmdbyte)))
  46. async def waitfor(self, length):
  47. return await self._sendcmd(CMD_WAITFOR, length)
  48. async def runfor(self, chan, length):
  49. return await self._sendcmd(CMD_RUNFOR, chan, length)
  50. async def terminate(self):
  51. return await self._sendcmd(CMD_TERMINATE)
  52. class SyncDatagram(object):
  53. '''Base interface for a more simple synchronous interface.'''
  54. def __init__(self): #pragma: no cover
  55. pass
  56. async def recv(self, timeout=None): #pragma: no cover
  57. '''Receive a datagram. If timeout is not None, wait that many
  58. seconds, and if nothing is received in that time, raise an TimeoutError
  59. exception.'''
  60. raise NotImplementedError
  61. async def send(self, data): #pragma: no cover
  62. '''Send a datagram.'''
  63. raise NotImplementedError
  64. async def sendtillrecv(self, data, freq):
  65. '''Send the datagram in data, every freq seconds until a datagram
  66. is received. If timeout seconds happen w/o receiving a datagram,
  67. then raise an TimeoutError exception.'''
  68. while True:
  69. await self.send(data)
  70. try:
  71. return await self.recv(freq)
  72. except TimeoutError:
  73. pass
  74. class MockSyncDatagram(SyncDatagram):
  75. '''A testing version of SyncDatagram. Define a method runner which
  76. implements part of the sequence. In the function, await on either
  77. self.get, to wait for the other side to send something, or await
  78. self.put w/ data to send.'''
  79. def __init__(self):
  80. self.sendq = asyncio.Queue()
  81. self.recvq = asyncio.Queue()
  82. self.task = None
  83. self.task = asyncio.create_task(self.runner())
  84. self.get = self.sendq.get
  85. self.put = self.recvq.put
  86. async def drain(self):
  87. '''Wait for the runner thread to finish up.'''
  88. return await self.task
  89. async def runner(self): #pragma: no cover
  90. raise NotImplementedError
  91. async def recv(self, timeout=None):
  92. return await self.recvq.get()
  93. async def send(self, data):
  94. return await self.sendq.put(data)
  95. def __del__(self): #pragma: no cover
  96. if self.task is not None and not self.task.done():
  97. self.task.cancel()
  98. class TestSyncData(unittest.IsolatedAsyncioTestCase):
  99. async def test_syncsendtillrecv(self):
  100. class MySync(SyncDatagram):
  101. def __init__(self):
  102. self.sendq = []
  103. self.resp = [ TimeoutError(), b'a' ]
  104. async def recv(self, timeout=None):
  105. assert timeout == 1
  106. r = self.resp.pop(0)
  107. if isinstance(r, Exception):
  108. raise r
  109. return r
  110. async def send(self, data):
  111. self.sendq.append(data)
  112. ms = MySync()
  113. r = await ms.sendtillrecv(b'foo', 1)
  114. self.assertEqual(r, b'a')
  115. self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
  116. def timeout(timeout):
  117. def timeout_wrapper(fun):
  118. @functools.wraps(fun)
  119. async def wrapper(*args, **kwargs):
  120. return await asyncio.wait_for(fun(*args, **kwargs),
  121. timeout)
  122. return wrapper
  123. return timeout_wrapper
  124. class TestLORANode(unittest.IsolatedAsyncioTestCase):
  125. @timeout(2)
  126. async def test_lora(self):
  127. class TestSD(MockSyncDatagram):
  128. async def runner(self):
  129. l = Strobe(domain)
  130. # start handshake
  131. r = await self.get()
  132. pkt = l.recv_enc(r[:-8])
  133. l.recv_mac(r[-8:])
  134. assert pkt.endswith(b'reqreset')
  135. await self.put(l.send_enc(os.urandom(16)) +
  136. l.send_mac(8))
  137. r = await self.get()
  138. c = l.recv_enc(r[:-8])
  139. l.recv_mac(r[-8:])
  140. assert c == b'confirm'
  141. await self.put(l.send_enc(b'confirmed') +
  142. l.send_mac(8))
  143. r = await self.get()
  144. cmd = l.recv_enc(r[:-8])
  145. l.recv_mac(r[-8:])
  146. assert cmd[0] == CMD_WAITFOR
  147. assert int.from_bytes(cmd[1:], byteorder='little') == 30
  148. await self.put(l.send_enc(cmd[0:1]) +
  149. l.send_mac(8))
  150. r = await self.get()
  151. cmd = l.recv_enc(r[:-8])
  152. l.recv_mac(r[-8:])
  153. assert cmd[0] == CMD_RUNFOR
  154. assert int.from_bytes(cmd[1:5], byteorder='little') == 1
  155. assert int.from_bytes(cmd[5:], byteorder='little') == 50
  156. await self.put(l.send_enc(cmd[0:1]) +
  157. l.send_mac(8))
  158. r = await self.get()
  159. cmd = l.recv_enc(r[:-8])
  160. l.recv_mac(r[-8:])
  161. assert cmd[0] == CMD_TERMINATE
  162. await self.put(l.send_enc(cmd[0:1]) +
  163. l.send_mac(8))
  164. tsd = TestSD()
  165. l = LORANode(tsd)
  166. await l.start()
  167. await l.waitfor(30)
  168. await l.runfor(1, 50)
  169. await l.terminate()
  170. await tsd.drain()
  171. # Make sure all messages have been processed
  172. self.assertTrue(tsd.sendq.empty())
  173. self.assertTrue(tsd.recvq.empty())
  174. print('done')