Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

880 lines
21 KiB

  1. # Copyright 2021 John-Mark Gurney.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions
  5. # are met:
  6. # 1. Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # 2. Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. #
  12. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  13. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  14. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  15. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  16. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  17. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  18. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  19. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  20. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  21. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  22. # SUCH DAMAGE.
  23. #
  24. import asyncio
  25. import contextlib
  26. import functools
  27. import itertools
  28. import os
  29. import unittest
  30. from Strobe.Strobe import Strobe, KeccakF
  31. from Strobe.Strobe import AuthenticationFailed
  32. import lora_comms
  33. from lora_comms import make_pktbuf
  34. domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  35. # Response to command will be the CMD and any arguments if needed.
  36. # The command is encoded as an unsigned byte
  37. CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
  38. # The follow commands are queue up, but will be acknoledged when queued
  39. CMD_WAITFOR = 2 # arg: (length): waits for length seconds
  40. CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
  41. CMD_PING = 4 # arg: (): a no op command
  42. class LORANode(object):
  43. '''Implement a LORANode initiator.'''
  44. MAC_LEN = 8
  45. def __init__(self, syncdatagram, shared=None):
  46. self.sd = syncdatagram
  47. self.st = Strobe(domain, F=KeccakF(800))
  48. if shared is not None:
  49. self.st.key(shared)
  50. async def start(self):
  51. resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')
  52. self.st.ratchet()
  53. pkt = await self.sendrecvvalid(b'confirm')
  54. if pkt != b'confirmed':
  55. raise RuntimeError('got invalid response: %s' %
  56. repr(pkt))
  57. async def sendrecvvalid(self, msg):
  58. msg = self.st.send_enc(msg) + self.st.send_mac(self.MAC_LEN)
  59. origstate = self.st.copy()
  60. while True:
  61. resp = await self.sd.sendtillrecv(msg, 1)
  62. #_debprint('got:', resp)
  63. # skip empty messages
  64. if len(resp) == 0:
  65. continue
  66. try:
  67. decmsg = self.st.recv_enc(resp[:-self.MAC_LEN])
  68. self.st.recv_mac(resp[-self.MAC_LEN:])
  69. break
  70. except AuthenticationFailed:
  71. # didn't get a valid packet, restore
  72. # state and retry
  73. #_debprint('failed')
  74. self.st.set_state_from(origstate)
  75. #_debprint('got rep:', repr(resp), repr(decmsg))
  76. return decmsg
  77. @staticmethod
  78. def _encodeargs(*args):
  79. r = []
  80. for i in args:
  81. r.append(i.to_bytes(4, byteorder='little'))
  82. return b''.join(r)
  83. async def _sendcmd(self, cmd, *args):
  84. cmdbyte = cmd.to_bytes(1, byteorder='little')
  85. resp = await self.sendrecvvalid(cmdbyte + self._encodeargs(*args))
  86. if resp[0:1] != cmdbyte:
  87. raise RuntimeError(
  88. 'response does not match, got: %s, expected: %s' %
  89. (repr(resp[0:1]), repr(cmdbyte)))
  90. async def waitfor(self, length):
  91. return await self._sendcmd(CMD_WAITFOR, length)
  92. async def runfor(self, chan, length):
  93. return await self._sendcmd(CMD_RUNFOR, chan, length)
  94. async def ping(self):
  95. return await self._sendcmd(CMD_PING)
  96. async def terminate(self):
  97. return await self._sendcmd(CMD_TERMINATE)
  98. class SyncDatagram(object):
  99. '''Base interface for a more simple synchronous interface.'''
  100. def __init__(self): #pragma: no cover
  101. pass
  102. async def recv(self, timeout=None): #pragma: no cover
  103. '''Receive a datagram. If timeout is not None, wait that many
  104. seconds, and if nothing is received in that time, raise an
  105. TimeoutError exception.'''
  106. raise NotImplementedError
  107. async def send(self, data): #pragma: no cover
  108. raise NotImplementedError
  109. async def sendtillrecv(self, data, freq):
  110. '''Send the datagram in data, every freq seconds until a datagram
  111. is received. If timeout seconds happen w/o receiving a datagram,
  112. then raise an TimeoutError exception.'''
  113. while True:
  114. #_debprint('sending:', repr(data))
  115. await self.send(data)
  116. try:
  117. return await self.recv(freq)
  118. except TimeoutError:
  119. pass
  120. class MockSyncDatagram(SyncDatagram):
  121. '''A testing version of SyncDatagram. Define a method runner which
  122. implements part of the sequence. In the function, await on either
  123. self.get, to wait for the other side to send something, or await
  124. self.put w/ data to send.'''
  125. def __init__(self):
  126. self.sendq = asyncio.Queue()
  127. self.recvq = asyncio.Queue()
  128. self.task = asyncio.create_task(self.runner())
  129. self.get = self.sendq.get
  130. self.put = self.recvq.put
  131. async def drain(self):
  132. '''Wait for the runner thread to finish up.'''
  133. return await self.task
  134. async def runner(self): #pragma: no cover
  135. raise NotImplementedError
  136. async def recv(self, timeout=None):
  137. return await self.recvq.get()
  138. async def send(self, data):
  139. return await self.sendq.put(data)
  140. def __del__(self): #pragma: no cover
  141. if self.task is not None and not self.task.done():
  142. self.task.cancel()
  143. class TestSyncData(unittest.IsolatedAsyncioTestCase):
  144. async def test_syncsendtillrecv(self):
  145. class MySync(SyncDatagram):
  146. def __init__(self):
  147. self.sendq = []
  148. self.resp = [ TimeoutError(), b'a' ]
  149. async def recv(self, timeout=None):
  150. assert timeout == 1
  151. r = self.resp.pop(0)
  152. if isinstance(r, Exception):
  153. raise r
  154. return r
  155. async def send(self, data):
  156. self.sendq.append(data)
  157. ms = MySync()
  158. r = await ms.sendtillrecv(b'foo', 1)
  159. self.assertEqual(r, b'a')
  160. self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
  161. def timeout(timeout):
  162. def timeout_wrapper(fun):
  163. @functools.wraps(fun)
  164. async def wrapper(*args, **kwargs):
  165. return await asyncio.wait_for(fun(*args, **kwargs),
  166. timeout)
  167. return wrapper
  168. return timeout_wrapper
  169. def _debprint(*args): # pragma: no cover
  170. import traceback, sys, os.path
  171. st = traceback.extract_stack(limit=2)[0]
  172. sep = ''
  173. if args:
  174. sep = ':'
  175. print('%s:%d%s' % (os.path.basename(st.filename), st.lineno, sep),
  176. *args)
  177. sys.stdout.flush()
  178. class AsyncSequence(object):
  179. '''
  180. Object used for sequencing async functions. To use, use the
  181. asynchronous context manager created by the sync method. For
  182. example:
  183. seq = AsyncSequence()
  184. async func1():
  185. async with seq.sync(1):
  186. second_fun()
  187. async func2():
  188. async with seq.sync(0):
  189. first_fun()
  190. This will make sure that function first_fun is run before running
  191. the function second_fun. If a previous block raises an Exception,
  192. it will be passed up, and all remaining blocks (and future ones)
  193. will raise a CancelledError to help ensure that any tasks are
  194. properly cleaned up.
  195. '''
  196. def __init__(self, positerfactory=lambda: itertools.count()):
  197. '''The argument positerfactory, is a factory that will
  198. create an iterator that will be used for the values that
  199. are passed to the sync method.'''
  200. self.positer = positerfactory()
  201. self.token = object()
  202. self.die = False
  203. self.waiting = {
  204. next(self.positer): self.token
  205. }
  206. @contextlib.asynccontextmanager
  207. async def sync(self, pos):
  208. '''An async context manager that will be run when it's
  209. turn arrives. It will only run when all the previous
  210. items in the iterator has been successfully run.'''
  211. if self.die:
  212. raise asyncio.CancelledError('seq cancelled')
  213. if pos in self.waiting:
  214. if self.waiting[pos] is not self.token:
  215. raise RuntimeError('pos already waiting!')
  216. else:
  217. fut = asyncio.Future()
  218. self.waiting[pos] = fut
  219. await fut
  220. # our time to shine!
  221. del self.waiting[pos]
  222. try:
  223. yield None
  224. except Exception as e:
  225. # if we got an exception, things went pear shaped,
  226. # shut everything down, and any future calls.
  227. #_debprint('dieing...', repr(e))
  228. self.die = True
  229. # cancel existing blocks
  230. while self.waiting:
  231. k, v = self.waiting.popitem()
  232. #_debprint('canceling: %s' % repr(k))
  233. if v is self.token:
  234. continue
  235. # for Python 3.9:
  236. # msg='pos %s raised exception: %s' %
  237. # (repr(pos), repr(e))
  238. v.cancel()
  239. # populate real exception up
  240. raise
  241. else:
  242. # handle next
  243. nextpos = next(self.positer)
  244. if nextpos in self.waiting:
  245. #_debprint('np:', repr(self), nextpos,
  246. # repr(self.waiting[nextpos]))
  247. self.waiting[nextpos].set_result(None)
  248. else:
  249. self.waiting[nextpos] = self.token
  250. class TestSequencing(unittest.IsolatedAsyncioTestCase):
  251. @timeout(2)
  252. async def test_seq_alreadywaiting(self):
  253. waitseq = AsyncSequence()
  254. seq = AsyncSequence()
  255. async def fun1():
  256. async with waitseq.sync(1):
  257. pass
  258. async def fun2():
  259. async with seq.sync(1):
  260. async with waitseq.sync(1): # pragma: no cover
  261. pass
  262. task1 = asyncio.create_task(fun1())
  263. task2 = asyncio.create_task(fun2())
  264. # spin things to make sure things advance
  265. await asyncio.sleep(0)
  266. async with seq.sync(0):
  267. pass
  268. with self.assertRaises(RuntimeError):
  269. await task2
  270. async with waitseq.sync(0):
  271. pass
  272. await task1
  273. @timeout(2)
  274. async def test_seqexc(self):
  275. seq = AsyncSequence()
  276. excseq = AsyncSequence()
  277. async def excfun1():
  278. async with seq.sync(1):
  279. pass
  280. async with excseq.sync(0):
  281. raise ValueError('foo')
  282. # that a block that enters first, but runs after
  283. # raises an exception
  284. async def excfun2():
  285. async with seq.sync(0):
  286. pass
  287. async with excseq.sync(1): # pragma: no cover
  288. pass
  289. # that a block that enters after, raises an
  290. # exception
  291. async def excfun3():
  292. async with seq.sync(2):
  293. pass
  294. async with excseq.sync(2): # pragma: no cover
  295. pass
  296. task1 = asyncio.create_task(excfun1())
  297. task2 = asyncio.create_task(excfun2())
  298. task3 = asyncio.create_task(excfun3())
  299. with self.assertRaises(ValueError):
  300. await task1
  301. with self.assertRaises(asyncio.CancelledError):
  302. await task2
  303. with self.assertRaises(asyncio.CancelledError):
  304. await task3
  305. @timeout(2)
  306. async def test_seq(self):
  307. # test that a seq object when created
  308. seq = AsyncSequence(lambda: itertools.count(1))
  309. col = []
  310. async def fun1():
  311. async with seq.sync(1):
  312. col.append(1)
  313. async with seq.sync(2):
  314. col.append(2)
  315. async with seq.sync(4):
  316. col.append(4)
  317. async def fun2():
  318. async with seq.sync(3):
  319. col.append(3)
  320. async with seq.sync(6):
  321. col.append(6)
  322. async def fun3():
  323. async with seq.sync(5):
  324. col.append(5)
  325. # and various functions are run
  326. task1 = asyncio.create_task(fun1())
  327. task2 = asyncio.create_task(fun2())
  328. task3 = asyncio.create_task(fun3())
  329. # and the functions complete
  330. await task3
  331. await task2
  332. await task1
  333. # that the order they ran in was correct
  334. self.assertEqual(col, list(range(1, 7)))
  335. class TestLORANode(unittest.IsolatedAsyncioTestCase):
  336. @timeout(2)
  337. async def test_lora(self):
  338. _self = self
  339. shared_key = os.urandom(32)
  340. class TestSD(MockSyncDatagram):
  341. async def sendgettest(self, msg):
  342. '''Send the message, but make sure that if a
  343. bad message is sent afterward, that it replies
  344. w/ the same previous message.
  345. '''
  346. await self.put(msg)
  347. resp = await self.get()
  348. await self.put(b'bogusmsg' * 5)
  349. resp2 = await self.get()
  350. _self.assertEqual(resp, resp2)
  351. return resp
  352. async def runner(self):
  353. l = Strobe(domain, F=KeccakF(800))
  354. l.key(shared_key)
  355. # start handshake
  356. r = await self.get()
  357. pkt = l.recv_enc(r[:-8])
  358. l.recv_mac(r[-8:])
  359. assert pkt.endswith(b'reqreset')
  360. # make sure junk gets ignored
  361. await self.put(b'sdlfkj')
  362. # and that the packet remains the same
  363. _self.assertEqual(r, await self.get())
  364. # and a couple more times
  365. await self.put(b'0' * 24)
  366. _self.assertEqual(r, await self.get())
  367. await self.put(b'0' * 32)
  368. _self.assertEqual(r, await self.get())
  369. # send the response
  370. await self.put(l.send_enc(os.urandom(16)) +
  371. l.send_mac(8))
  372. # require no more back tracking at this point
  373. l.ratchet()
  374. # get the confirmation message
  375. r = await self.get()
  376. # test the resend capabilities
  377. await self.put(b'0' * 24)
  378. _self.assertEqual(r, await self.get())
  379. # decode confirmation message
  380. c = l.recv_enc(r[:-8])
  381. l.recv_mac(r[-8:])
  382. # assert that we got it
  383. _self.assertEqual(c, b'confirm')
  384. # send confirmed reply
  385. r = await self.sendgettest(l.send_enc(
  386. b'confirmed') + l.send_mac(8))
  387. # test and decode remaining command messages
  388. cmd = l.recv_enc(r[:-8])
  389. l.recv_mac(r[-8:])
  390. assert cmd[0] == CMD_WAITFOR
  391. assert int.from_bytes(cmd[1:],
  392. byteorder='little') == 30
  393. r = await self.sendgettest(l.send_enc(
  394. cmd[0:1]) + l.send_mac(8))
  395. cmd = l.recv_enc(r[:-8])
  396. l.recv_mac(r[-8:])
  397. assert cmd[0] == CMD_RUNFOR
  398. assert int.from_bytes(cmd[1:5],
  399. byteorder='little') == 1
  400. assert int.from_bytes(cmd[5:],
  401. byteorder='little') == 50
  402. r = await self.sendgettest(l.send_enc(
  403. cmd[0:1]) + l.send_mac(8))
  404. cmd = l.recv_enc(r[:-8])
  405. l.recv_mac(r[-8:])
  406. assert cmd[0] == CMD_TERMINATE
  407. await self.put(l.send_enc(cmd[0:1]) +
  408. l.send_mac(8))
  409. tsd = TestSD()
  410. l = LORANode(tsd, shared=shared_key)
  411. await l.start()
  412. await l.waitfor(30)
  413. await l.runfor(1, 50)
  414. await l.terminate()
  415. await tsd.drain()
  416. # Make sure all messages have been processed
  417. self.assertTrue(tsd.sendq.empty())
  418. self.assertTrue(tsd.recvq.empty())
  419. #_debprint('done')
  420. @timeout(2)
  421. async def test_ccode_badmsgs(self):
  422. # Test to make sure that various bad messages in the
  423. # handshake process are rejected even if the attacker
  424. # has the correct key. This just keeps the protocol
  425. # tight allowing for variations in the future.
  426. # seed the RNG
  427. prngseed = b'abc123'
  428. from ctypes import c_uint8
  429. lora_comms.strobe_seed_prng((c_uint8 *
  430. len(prngseed))(*prngseed), len(prngseed))
  431. # Create the state for testing
  432. commstate = lora_comms.CommsState()
  433. cb = lora_comms.process_msgfunc_t(lambda msg, outbuf: None)
  434. # Generate shared key
  435. shared_key = os.urandom(32)
  436. # Initialize everything
  437. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  438. # Create test fixture, only use it to init crypto state
  439. tsd = SyncDatagram()
  440. l = LORANode(tsd, shared=shared_key)
  441. # copy the crypto state
  442. cstate = l.st.copy()
  443. # compose an incorrect init message
  444. msg = os.urandom(16) + b'othre'
  445. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  446. out = lora_comms.comms_process_wrap(commstate, msg)
  447. self.assertFalse(out)
  448. # copy the crypto state
  449. cstate = l.st.copy()
  450. # compose an incorrect init message
  451. msg = os.urandom(16) + b' eqreset'
  452. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  453. out = lora_comms.comms_process_wrap(commstate, msg)
  454. self.assertFalse(out)
  455. # compose the correct init message
  456. msg = os.urandom(16) + b'reqreset'
  457. msg = l.st.send_enc(msg) + l.st.send_mac(l.MAC_LEN)
  458. out = lora_comms.comms_process_wrap(commstate, msg)
  459. l.st.recv_enc(out[:-l.MAC_LEN])
  460. l.st.recv_mac(out[-l.MAC_LEN:])
  461. l.st.ratchet()
  462. # copy the crypto state
  463. cstate = l.st.copy()
  464. # compose an incorrect confirmed message
  465. msg = b'onfirm'
  466. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  467. out = lora_comms.comms_process_wrap(commstate, msg)
  468. self.assertFalse(out)
  469. # copy the crypto state
  470. cstate = l.st.copy()
  471. # compose an incorrect confirmed message
  472. msg = b' onfirm'
  473. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  474. out = lora_comms.comms_process_wrap(commstate, msg)
  475. self.assertFalse(out)
  476. @timeout(2)
  477. async def test_ccode(self):
  478. _self = self
  479. from ctypes import c_uint8
  480. # seed the RNG
  481. prngseed = b'abc123'
  482. lora_comms.strobe_seed_prng((c_uint8 *
  483. len(prngseed))(*prngseed), len(prngseed))
  484. # Create the state for testing
  485. commstate = lora_comms.CommsState()
  486. # These are the expected messages and their arguments
  487. exptmsgs = [
  488. (CMD_WAITFOR, [ 30 ]),
  489. (CMD_RUNFOR, [ 1, 50 ]),
  490. (CMD_PING, [ ]),
  491. (CMD_TERMINATE, [ ]),
  492. ]
  493. def procmsg(msg, outbuf):
  494. msgbuf = msg._from()
  495. cmd = msgbuf[0]
  496. args = [ int.from_bytes(msgbuf[x:x + 4],
  497. byteorder='little') for x in range(1, len(msgbuf),
  498. 4) ]
  499. if exptmsgs[0] == (cmd, args):
  500. exptmsgs.pop(0)
  501. outbuf[0].pkt[0] = cmd
  502. outbuf[0].pktlen = 1
  503. else: #pragma: no cover
  504. raise RuntimeError('cmd not found')
  505. # wrap the callback function
  506. cb = lora_comms.process_msgfunc_t(procmsg)
  507. class CCodeSD(MockSyncDatagram):
  508. async def runner(self):
  509. for expectlen in [ 24, 17, 9, 9, 9, 9 ]:
  510. # get message
  511. inmsg = await self.get()
  512. # process the test message
  513. out = lora_comms.comms_process_wrap(
  514. commstate, inmsg)
  515. # make sure the reply matches length
  516. _self.assertEqual(expectlen, len(out))
  517. # save what was originally replied
  518. origmsg = out
  519. # pretend that the reply didn't make it
  520. out = lora_comms.comms_process_wrap(
  521. commstate, inmsg)
  522. # make sure that the reply matches
  523. # the previous
  524. _self.assertEqual(origmsg, out)
  525. # pass the reply back
  526. await self.put(out)
  527. # Generate shared key
  528. shared_key = os.urandom(32)
  529. # Initialize everything
  530. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  531. # Create test fixture
  532. tsd = CCodeSD()
  533. l = LORANode(tsd, shared=shared_key)
  534. # Send various messages
  535. await l.start()
  536. await l.waitfor(30)
  537. await l.runfor(1, 50)
  538. await l.ping()
  539. await l.terminate()
  540. await tsd.drain()
  541. # Make sure all messages have been processed
  542. self.assertTrue(tsd.sendq.empty())
  543. self.assertTrue(tsd.recvq.empty())
  544. # Make sure all the expected messages have been
  545. # processed.
  546. self.assertFalse(exptmsgs)
  547. #_debprint('done')
  548. @timeout(2)
  549. async def test_ccode_newsession(self):
  550. '''This test is to make sure that if an existing session
  551. is running, that a new session can be established, and that
  552. when it does, the old session becomes inactive.
  553. '''
  554. _self = self
  555. from ctypes import c_uint8
  556. seq = AsyncSequence()
  557. # seed the RNG
  558. prngseed = b'abc123'
  559. lora_comms.strobe_seed_prng((c_uint8 *
  560. len(prngseed))(*prngseed), len(prngseed))
  561. # Create the state for testing
  562. commstate = lora_comms.CommsState()
  563. # These are the expected messages and their arguments
  564. exptmsgs = [
  565. (CMD_WAITFOR, [ 30 ]),
  566. (CMD_WAITFOR, [ 70 ]),
  567. (CMD_WAITFOR, [ 40 ]),
  568. (CMD_TERMINATE, [ ]),
  569. ]
  570. def procmsg(msg, outbuf):
  571. msgbuf = msg._from()
  572. cmd = msgbuf[0]
  573. args = [ int.from_bytes(msgbuf[x:x + 4],
  574. byteorder='little') for x in range(1, len(msgbuf),
  575. 4) ]
  576. if exptmsgs[0] == (cmd, args):
  577. exptmsgs.pop(0)
  578. outbuf[0].pkt[0] = cmd
  579. outbuf[0].pktlen = 1
  580. else: #pragma: no cover
  581. raise RuntimeError('cmd not found: %d' % cmd)
  582. # wrap the callback function
  583. cb = lora_comms.process_msgfunc_t(procmsg)
  584. class FlipMsg(object):
  585. async def flipmsg(self):
  586. # get message
  587. inmsg = await self.get()
  588. # process the test message
  589. out = lora_comms.comms_process_wrap(
  590. commstate, inmsg)
  591. # pass the reply back
  592. await self.put(out)
  593. # this class always passes messages, this is
  594. # used for the first session.
  595. class CCodeSD1(MockSyncDatagram, FlipMsg):
  596. async def runner(self):
  597. for i in range(3):
  598. await self.flipmsg()
  599. async with seq.sync(0):
  600. # create bogus message
  601. inmsg = b'0'*24
  602. # process the bogus message
  603. out = lora_comms.comms_process_wrap(
  604. commstate, inmsg)
  605. # make sure there was not a response
  606. _self.assertFalse(out)
  607. await self.flipmsg()
  608. # this one is special in that it will pause after the first
  609. # message to ensure that the previous session will continue
  610. # to work, AND that if a new "new" session comes along, it
  611. # will override the previous new session that hasn't been
  612. # confirmed yet.
  613. class CCodeSD2(MockSyncDatagram, FlipMsg):
  614. async def runner(self):
  615. # pass one message from the new session
  616. async with seq.sync(1):
  617. # There might be a missing case
  618. # handled for when the confirmed
  619. # message is generated, but lost.
  620. await self.flipmsg()
  621. # and the old session is still active
  622. await l.waitfor(70)
  623. async with seq.sync(2):
  624. for i in range(3):
  625. await self.flipmsg()
  626. # Generate shared key
  627. shared_key = os.urandom(32)
  628. # Initialize everything
  629. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  630. # Create test fixture
  631. tsd = CCodeSD1()
  632. l = LORANode(tsd, shared=shared_key)
  633. # Send various messages
  634. await l.start()
  635. await l.waitfor(30)
  636. # Ensure that a new one can take over
  637. tsd2 = CCodeSD2()
  638. l2 = LORANode(tsd2, shared=shared_key)
  639. # Send various messages
  640. await l2.start()
  641. await l2.waitfor(40)
  642. await l2.terminate()
  643. await tsd.drain()
  644. await tsd2.drain()
  645. # Make sure all messages have been processed
  646. self.assertTrue(tsd.sendq.empty())
  647. self.assertTrue(tsd.recvq.empty())
  648. self.assertTrue(tsd2.sendq.empty())
  649. self.assertTrue(tsd2.recvq.empty())
  650. # Make sure all the expected messages have been
  651. # processed.
  652. self.assertFalse(exptmsgs)