Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1152 lines
28 KiB

  1. # Copyright 2021 John-Mark Gurney.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions
  5. # are met:
  6. # 1. Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # 2. Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. #
  12. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  13. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  14. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  15. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  16. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  17. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  18. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  19. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  20. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  21. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  22. # SUCH DAMAGE.
  23. #
  24. import asyncio
  25. import contextlib
  26. import functools
  27. import itertools
  28. import os
  29. import sys
  30. import unittest
  31. from Strobe.Strobe import Strobe, KeccakF
  32. from Strobe.Strobe import AuthenticationFailed
  33. import lora_comms
  34. from lora_comms import make_pktbuf
  35. import multicast
  36. from util import *
  37. domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  38. # Response to command will be the CMD and any arguments if needed.
  39. # The command is encoded as an unsigned byte
  40. CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
  41. # The follow commands are queue up, but will be acknoledged when queued
  42. CMD_WAITFOR = 2 # arg: (length): waits for length seconds
  43. CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
  44. CMD_PING = 4 # arg: (): a no op command
  45. CMD_SETUNSET = 5 # arg: (chan, val): sets chan to val
  46. CMD_ADV = 6 # arg: ([cnt]): advances to the next cnt (default 1) command
  47. CMD_CLEAR = 7 # arg: (): clears all future commands, but keeps current running
  48. class LORANode(object):
  49. '''Implement a LORANode initiator.'''
  50. MAC_LEN = 8
  51. def __init__(self, syncdatagram, shared=None):
  52. self.sd = syncdatagram
  53. self.st = Strobe(domain, F=KeccakF(800))
  54. if shared is not None:
  55. self.st.key(shared)
  56. async def start(self):
  57. resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')
  58. self.st.ratchet()
  59. pkt = await self.sendrecvvalid(b'confirm')
  60. if pkt != b'confirmed':
  61. raise RuntimeError('got invalid response: %s' %
  62. repr(pkt))
  63. async def sendrecvvalid(self, msg):
  64. msg = self.st.send_enc(msg) + self.st.send_mac(self.MAC_LEN)
  65. origstate = self.st.copy()
  66. while True:
  67. resp = await self.sd.sendtillrecv(msg, .25)
  68. #_debprint('got:', resp)
  69. # skip empty messages
  70. if len(resp) == 0:
  71. continue
  72. try:
  73. decmsg = self.st.recv_enc(resp[:-self.MAC_LEN])
  74. self.st.recv_mac(resp[-self.MAC_LEN:])
  75. break
  76. except AuthenticationFailed:
  77. # didn't get a valid packet, restore
  78. # state and retry
  79. #_debprint('failed')
  80. self.st.set_state_from(origstate)
  81. #_debprint('got rep:', repr(resp), repr(decmsg))
  82. return decmsg
  83. @staticmethod
  84. def _encodeargs(*args):
  85. r = []
  86. for i in args:
  87. r.append(i.to_bytes(4, byteorder='little'))
  88. return b''.join(r)
  89. async def _sendcmd(self, cmd, *args):
  90. cmdbyte = cmd.to_bytes(1, byteorder='little')
  91. resp = await self.sendrecvvalid(cmdbyte + self._encodeargs(*args))
  92. if resp[0:1] != cmdbyte:
  93. raise RuntimeError(
  94. 'response does not match, got: %s, expected: %s' %
  95. (repr(resp[0:1]), repr(cmdbyte)))
  96. async def waitfor(self, length):
  97. return await self._sendcmd(CMD_WAITFOR, length)
  98. async def runfor(self, chan, length):
  99. return await self._sendcmd(CMD_RUNFOR, chan, length)
  100. async def setunset(self, chan, val):
  101. return await self._sendcmd(CMD_SETUNSET, chan, val)
  102. async def ping(self):
  103. return await self._sendcmd(CMD_PING)
  104. async def adv(self, cnt=None):
  105. args = ()
  106. if cnt is not None:
  107. args = (cnt, )
  108. return await self._sendcmd(CMD_ADV, *args)
  109. async def clear(self):
  110. return await self._sendcmd(CMD_CLEAR)
  111. async def terminate(self):
  112. return await self._sendcmd(CMD_TERMINATE)
  113. class SyncDatagram(object):
  114. '''Base interface for a more simple synchronous interface.'''
  115. async def recv(self, timeout=None): #pragma: no cover
  116. '''Receive a datagram. If timeout is not None, wait that many
  117. seconds, and if nothing is received in that time, raise an
  118. asyncio.TimeoutError exception.'''
  119. raise NotImplementedError
  120. async def send(self, data): #pragma: no cover
  121. raise NotImplementedError
  122. async def sendtillrecv(self, data, freq):
  123. '''Send the datagram in data, every freq seconds until a datagram
  124. is received. If timeout seconds happen w/o receiving a datagram,
  125. then raise an TimeoutError exception.'''
  126. while True:
  127. #_debprint('sending:', repr(data))
  128. await self.send(data)
  129. try:
  130. return await self.recv(freq)
  131. except asyncio.TimeoutError:
  132. pass
  133. class MulticastSyncDatagram(SyncDatagram):
  134. '''
  135. An implementation of SyncDatagram that uses the provided
  136. multicast address maddr as the source/sink of the packets.
  137. Note that once created, the start coroutine needs to be
  138. await'd before being passed to a LORANode so that everything
  139. is running.
  140. '''
  141. # Note: sent packets will be received. A similar method to
  142. # what was done in multicast.{to,from}_loragw could be done
  143. # here as well, that is passing in a set of packets to not
  144. # pass back up.
  145. def __init__(self, maddr):
  146. self.maddr = maddr
  147. self._ignpkts = set()
  148. async def start(self):
  149. self.mr = await multicast.create_multicast_receiver(self.maddr)
  150. self.mt = await multicast.create_multicast_transmitter(
  151. self.maddr)
  152. async def _recv(self):
  153. while True:
  154. pkt = await self.mr.recv()
  155. pkt = pkt[0]
  156. if pkt not in self._ignpkts:
  157. return pkt
  158. self._ignpkts.remove(pkt)
  159. async def recv(self, timeout=None): #pragma: no cover
  160. r = await asyncio.wait_for(self._recv(), timeout=timeout)
  161. return r
  162. async def send(self, data): #pragma: no cover
  163. self._ignpkts.add(bytes(data))
  164. await self.mt.send(data)
  165. def close(self):
  166. '''Shutdown communications.'''
  167. self.mr.close()
  168. self.mr = None
  169. self.mt.close()
  170. self.mt = None
  171. def listsplit(lst, item):
  172. try:
  173. idx = lst.index(item)
  174. except ValueError:
  175. return lst, []
  176. return lst[:idx], lst[idx + 1:]
  177. async def main():
  178. import argparse
  179. from loraserv import DEFAULT_MADDR as maddr
  180. parser = argparse.ArgumentParser()
  181. parser.add_argument('-f', dest='schedfile', metavar='filename', type=str,
  182. help='Use commands from the file. One command per line.')
  183. parser.add_argument('-r', dest='client', metavar='module:function', type=str,
  184. help='Create a respondant instead of sending commands. Commands will be passed to the function.')
  185. parser.add_argument('-s', dest='shared_key', metavar='shared_key', type=str, required=True,
  186. help='The shared key (encoded as UTF-8) to use.')
  187. parser.add_argument('args', metavar='CMD_ARG', type=str, nargs='*',
  188. help='Various commands to send to the device.')
  189. args = parser.parse_args()
  190. shared_key = args.shared_key.encode('utf-8')
  191. if args.client:
  192. # Run a client
  193. mr = await multicast.create_multicast_receiver(maddr)
  194. mt = await multicast.create_multicast_transmitter(maddr)
  195. from ctypes import c_uint8
  196. # seed the RNG
  197. prngseed = os.urandom(64)
  198. lora_comms.strobe_seed_prng((c_uint8 *
  199. len(prngseed))(*prngseed), len(prngseed))
  200. # Create the state for testing
  201. commstate = lora_comms.CommsState()
  202. import util_load
  203. client_func = util_load.load_application(args.client)
  204. def client_call(msg, outbuf):
  205. ret = client_func(msg._from())
  206. if len(ret) > outbuf[0].pktlen:
  207. ret = b'error, too long buffer: %d' % len(ret)
  208. outbuf[0].pktlen = min(len(ret), outbuf[0].pktlen)
  209. for i in range(outbuf[0].pktlen):
  210. outbuf[0].pkt[i] = ret[i]
  211. cb = lora_comms.process_msgfunc_t(client_call)
  212. # Initialize everything
  213. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  214. try:
  215. while True:
  216. pkt = await mr.recv()
  217. msg = pkt[0]
  218. out = lora_comms.comms_process_wrap(
  219. commstate, msg)
  220. if out:
  221. await mt.send(out)
  222. finally:
  223. mr.close()
  224. mt.close()
  225. sys.exit(0)
  226. msd = MulticastSyncDatagram(maddr)
  227. await msd.start()
  228. l = LORANode(msd, shared=shared_key)
  229. await l.start()
  230. valid_cmds = {
  231. 'waitfor', 'setunset', 'runfor', 'ping', 'adv', 'clear',
  232. 'terminate',
  233. }
  234. if args.args and args.schedfile:
  235. parser.error('only one of -f or arguments can be specified.')
  236. if args.args:
  237. cmds = list(args.args)
  238. cmdargs = []
  239. while cmds:
  240. a, cmds = listsplit(cmds, '--')
  241. cmdargs.append(a)
  242. else:
  243. with open(args.schedfile) as fp:
  244. cmdargs = [ x.split() for x in fp.readlines() ]
  245. while cmdargs:
  246. cmd, *args = cmdargs.pop(0)
  247. if cmd not in valid_cmds:
  248. print('invalid command:', repr(cmd))
  249. sys.exit(1)
  250. fun = getattr(l, cmd)
  251. await fun(*(int(x) for x in args))
  252. if __name__ == '__main__':
  253. asyncio.run(main())
  254. class MockSyncDatagram(SyncDatagram):
  255. '''A testing version of SyncDatagram. Define a method runner which
  256. implements part of the sequence. In the function, await on either
  257. self.get, to wait for the other side to send something, or await
  258. self.put w/ data to send.'''
  259. def __init__(self):
  260. self.sendq = asyncio.Queue()
  261. self.recvq = asyncio.Queue()
  262. self.task = asyncio.create_task(self.runner())
  263. self.get = self.sendq.get
  264. self.put = self.recvq.put
  265. async def drain(self):
  266. '''Wait for the runner thread to finish up.'''
  267. return await self.task
  268. async def runner(self): #pragma: no cover
  269. raise NotImplementedError
  270. async def recv(self, timeout=None):
  271. return await self.recvq.get()
  272. async def send(self, data):
  273. return await self.sendq.put(data)
  274. def __del__(self): #pragma: no cover
  275. if self.task is not None and not self.task.done():
  276. self.task.cancel()
  277. class TestSyncData(unittest.IsolatedAsyncioTestCase):
  278. async def test_syncsendtillrecv(self):
  279. class MySync(SyncDatagram):
  280. def __init__(self):
  281. self.sendq = []
  282. self.resp = [ asyncio.TimeoutError(), b'a' ]
  283. async def recv(self, timeout=None):
  284. assert timeout == 1
  285. r = self.resp.pop(0)
  286. if isinstance(r, Exception):
  287. raise r
  288. return r
  289. async def send(self, data):
  290. self.sendq.append(data)
  291. ms = MySync()
  292. r = await ms.sendtillrecv(b'foo', 1)
  293. self.assertEqual(r, b'a')
  294. self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
  295. class AsyncSequence(object):
  296. '''
  297. Object used for sequencing async functions. To use, use the
  298. asynchronous context manager created by the sync method. For
  299. example:
  300. seq = AsyncSequence()
  301. async func1():
  302. async with seq.sync(1):
  303. second_fun()
  304. async func2():
  305. async with seq.sync(0):
  306. first_fun()
  307. This will make sure that function first_fun is run before running
  308. the function second_fun. If a previous block raises an Exception,
  309. it will be passed up, and all remaining blocks (and future ones)
  310. will raise a CancelledError to help ensure that any tasks are
  311. properly cleaned up.
  312. '''
  313. def __init__(self, positerfactory=lambda: itertools.count()):
  314. '''The argument positerfactory, is a factory that will
  315. create an iterator that will be used for the values that
  316. are passed to the sync method.'''
  317. self.positer = positerfactory()
  318. self.token = object()
  319. self.die = False
  320. self.waiting = {
  321. next(self.positer): self.token
  322. }
  323. async def simpsync(self, pos):
  324. async with self.sync(pos):
  325. pass
  326. @contextlib.asynccontextmanager
  327. async def sync(self, pos):
  328. '''An async context manager that will be run when it's
  329. turn arrives. It will only run when all the previous
  330. items in the iterator has been successfully run.'''
  331. if self.die:
  332. raise asyncio.CancelledError('seq cancelled')
  333. if pos in self.waiting:
  334. if self.waiting[pos] is not self.token:
  335. raise RuntimeError('pos already waiting!')
  336. else:
  337. fut = asyncio.Future()
  338. self.waiting[pos] = fut
  339. await fut
  340. # our time to shine!
  341. del self.waiting[pos]
  342. try:
  343. yield None
  344. except Exception as e:
  345. # if we got an exception, things went pear shaped,
  346. # shut everything down, and any future calls.
  347. #_debprint('dieing...', repr(e))
  348. self.die = True
  349. # cancel existing blocks
  350. while self.waiting:
  351. k, v = self.waiting.popitem()
  352. #_debprint('canceling: %s' % repr(k))
  353. if v is self.token:
  354. continue
  355. # for Python 3.9:
  356. # msg='pos %s raised exception: %s' %
  357. # (repr(pos), repr(e))
  358. v.cancel()
  359. # populate real exception up
  360. raise
  361. else:
  362. # handle next
  363. nextpos = next(self.positer)
  364. if nextpos in self.waiting:
  365. #_debprint('np:', repr(self), nextpos,
  366. # repr(self.waiting[nextpos]))
  367. self.waiting[nextpos].set_result(None)
  368. else:
  369. self.waiting[nextpos] = self.token
  370. class TestSequencing(unittest.IsolatedAsyncioTestCase):
  371. @timeout(2)
  372. async def test_seq_alreadywaiting(self):
  373. waitseq = AsyncSequence()
  374. seq = AsyncSequence()
  375. async def fun1():
  376. async with waitseq.sync(1):
  377. pass
  378. async def fun2():
  379. async with seq.sync(1):
  380. async with waitseq.sync(1): # pragma: no cover
  381. pass
  382. task1 = asyncio.create_task(fun1())
  383. task2 = asyncio.create_task(fun2())
  384. # spin things to make sure things advance
  385. await asyncio.sleep(0)
  386. async with seq.sync(0):
  387. pass
  388. with self.assertRaises(RuntimeError):
  389. await task2
  390. async with waitseq.sync(0):
  391. pass
  392. await task1
  393. @timeout(2)
  394. async def test_seqexc(self):
  395. seq = AsyncSequence()
  396. excseq = AsyncSequence()
  397. async def excfun1():
  398. async with seq.sync(1):
  399. pass
  400. async with excseq.sync(0):
  401. raise ValueError('foo')
  402. # that a block that enters first, but runs after
  403. # raises an exception
  404. async def excfun2():
  405. async with seq.sync(0):
  406. pass
  407. async with excseq.sync(1): # pragma: no cover
  408. pass
  409. # that a block that enters after, raises an
  410. # exception
  411. async def excfun3():
  412. async with seq.sync(2):
  413. pass
  414. async with excseq.sync(2): # pragma: no cover
  415. pass
  416. task1 = asyncio.create_task(excfun1())
  417. task2 = asyncio.create_task(excfun2())
  418. task3 = asyncio.create_task(excfun3())
  419. with self.assertRaises(ValueError):
  420. await task1
  421. with self.assertRaises(asyncio.CancelledError):
  422. await task2
  423. with self.assertRaises(asyncio.CancelledError):
  424. await task3
  425. @timeout(2)
  426. async def test_seq(self):
  427. # test that a seq object when created
  428. seq = AsyncSequence(lambda: itertools.count(1))
  429. col = []
  430. async def fun1():
  431. async with seq.sync(1):
  432. col.append(1)
  433. async with seq.sync(2):
  434. col.append(2)
  435. async with seq.sync(4):
  436. col.append(4)
  437. async def fun2():
  438. async with seq.sync(3):
  439. col.append(3)
  440. async with seq.sync(6):
  441. col.append(6)
  442. async def fun3():
  443. async with seq.sync(5):
  444. col.append(5)
  445. # and various functions are run
  446. task1 = asyncio.create_task(fun1())
  447. task2 = asyncio.create_task(fun2())
  448. task3 = asyncio.create_task(fun3())
  449. # and the functions complete
  450. await task3
  451. await task2
  452. await task1
  453. # that the order they ran in was correct
  454. self.assertEqual(col, list(range(1, 7)))
  455. class TestLORANode(unittest.IsolatedAsyncioTestCase):
  456. @timeout(2)
  457. async def test_lora(self):
  458. _self = self
  459. shared_key = os.urandom(32)
  460. class TestSD(MockSyncDatagram):
  461. async def sendgettest(self, msg):
  462. '''Send the message, but make sure that if a
  463. bad message is sent afterward, that it replies
  464. w/ the same previous message.
  465. '''
  466. await self.put(msg)
  467. resp = await self.get()
  468. await self.put(b'bogusmsg' * 5)
  469. resp2 = await self.get()
  470. _self.assertEqual(resp, resp2)
  471. return resp
  472. async def runner(self):
  473. l = Strobe(domain, F=KeccakF(800))
  474. l.key(shared_key)
  475. # start handshake
  476. r = await self.get()
  477. pkt = l.recv_enc(r[:-8])
  478. l.recv_mac(r[-8:])
  479. assert pkt.endswith(b'reqreset')
  480. # make sure junk gets ignored
  481. await self.put(b'sdlfkj')
  482. # and that the packet remains the same
  483. _self.assertEqual(r, await self.get())
  484. # and a couple more times
  485. await self.put(b'0' * 24)
  486. _self.assertEqual(r, await self.get())
  487. await self.put(b'0' * 32)
  488. _self.assertEqual(r, await self.get())
  489. # send the response
  490. await self.put(l.send_enc(os.urandom(16)) +
  491. l.send_mac(8))
  492. # require no more back tracking at this point
  493. l.ratchet()
  494. # get the confirmation message
  495. r = await self.get()
  496. # test the resend capabilities
  497. await self.put(b'0' * 24)
  498. _self.assertEqual(r, await self.get())
  499. # decode confirmation message
  500. c = l.recv_enc(r[:-8])
  501. l.recv_mac(r[-8:])
  502. # assert that we got it
  503. _self.assertEqual(c, b'confirm')
  504. # send confirmed reply
  505. r = await self.sendgettest(l.send_enc(
  506. b'confirmed') + l.send_mac(8))
  507. # test and decode remaining command messages
  508. cmd = l.recv_enc(r[:-8])
  509. l.recv_mac(r[-8:])
  510. assert cmd[0] == CMD_WAITFOR
  511. assert int.from_bytes(cmd[1:],
  512. byteorder='little') == 30
  513. r = await self.sendgettest(l.send_enc(
  514. cmd[0:1]) + l.send_mac(8))
  515. cmd = l.recv_enc(r[:-8])
  516. l.recv_mac(r[-8:])
  517. assert cmd[0] == CMD_RUNFOR
  518. assert int.from_bytes(cmd[1:5],
  519. byteorder='little') == 1
  520. assert int.from_bytes(cmd[5:],
  521. byteorder='little') == 50
  522. r = await self.sendgettest(l.send_enc(
  523. cmd[0:1]) + l.send_mac(8))
  524. cmd = l.recv_enc(r[:-8])
  525. l.recv_mac(r[-8:])
  526. assert cmd[0] == CMD_TERMINATE
  527. await self.put(l.send_enc(cmd[0:1]) +
  528. l.send_mac(8))
  529. tsd = TestSD()
  530. l = LORANode(tsd, shared=shared_key)
  531. await l.start()
  532. await l.waitfor(30)
  533. await l.runfor(1, 50)
  534. await l.terminate()
  535. await tsd.drain()
  536. # Make sure all messages have been processed
  537. self.assertTrue(tsd.sendq.empty())
  538. self.assertTrue(tsd.recvq.empty())
  539. #_debprint('done')
  540. @timeout(2)
  541. async def test_ccode_badmsgs(self):
  542. # Test to make sure that various bad messages in the
  543. # handshake process are rejected even if the attacker
  544. # has the correct key. This just keeps the protocol
  545. # tight allowing for variations in the future.
  546. # seed the RNG
  547. prngseed = b'abc123'
  548. from ctypes import c_uint8
  549. lora_comms.strobe_seed_prng((c_uint8 *
  550. len(prngseed))(*prngseed), len(prngseed))
  551. # Create the state for testing
  552. commstate = lora_comms.CommsState()
  553. cb = lora_comms.process_msgfunc_t(lambda msg, outbuf: None)
  554. # Generate shared key
  555. shared_key = os.urandom(32)
  556. # Initialize everything
  557. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  558. # Create test fixture, only use it to init crypto state
  559. tsd = SyncDatagram()
  560. l = LORANode(tsd, shared=shared_key)
  561. # copy the crypto state
  562. cstate = l.st.copy()
  563. # compose an incorrect init message
  564. msg = os.urandom(16) + b'othre'
  565. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  566. out = lora_comms.comms_process_wrap(commstate, msg)
  567. self.assertFalse(out)
  568. # that varous short messages don't cause problems
  569. for i in range(10):
  570. out = lora_comms.comms_process_wrap(commstate, b'0' * i)
  571. self.assertFalse(out)
  572. # copy the crypto state
  573. cstate = l.st.copy()
  574. # compose an incorrect init message
  575. msg = os.urandom(16) + b' eqreset'
  576. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  577. out = lora_comms.comms_process_wrap(commstate, msg)
  578. self.assertFalse(out)
  579. # compose the correct init message
  580. msg = os.urandom(16) + b'reqreset'
  581. msg = l.st.send_enc(msg) + l.st.send_mac(l.MAC_LEN)
  582. out = lora_comms.comms_process_wrap(commstate, msg)
  583. l.st.recv_enc(out[:-l.MAC_LEN])
  584. l.st.recv_mac(out[-l.MAC_LEN:])
  585. l.st.ratchet()
  586. # copy the crypto state
  587. cstate = l.st.copy()
  588. # compose an incorrect confirmed message
  589. msg = b'onfirm'
  590. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  591. out = lora_comms.comms_process_wrap(commstate, msg)
  592. self.assertFalse(out)
  593. # copy the crypto state
  594. cstate = l.st.copy()
  595. # compose an incorrect confirmed message
  596. msg = b' onfirm'
  597. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  598. out = lora_comms.comms_process_wrap(commstate, msg)
  599. self.assertFalse(out)
  600. @timeout(2)
  601. async def test_ccode(self):
  602. _self = self
  603. from ctypes import c_uint8
  604. # seed the RNG
  605. prngseed = b'abc123'
  606. lora_comms.strobe_seed_prng((c_uint8 *
  607. len(prngseed))(*prngseed), len(prngseed))
  608. # Create the state for testing
  609. commstate = lora_comms.CommsState()
  610. # These are the expected messages and their arguments
  611. exptmsgs = [
  612. (CMD_WAITFOR, [ 30 ]),
  613. (CMD_RUNFOR, [ 1, 50 ]),
  614. (CMD_PING, [ ]),
  615. (CMD_TERMINATE, [ ]),
  616. ]
  617. def procmsg(msg, outbuf):
  618. msgbuf = msg._from()
  619. cmd = msgbuf[0]
  620. args = [ int.from_bytes(msgbuf[x:x + 4],
  621. byteorder='little') for x in range(1, len(msgbuf),
  622. 4) ]
  623. if exptmsgs[0] == (cmd, args):
  624. exptmsgs.pop(0)
  625. outbuf[0].pkt[0] = cmd
  626. outbuf[0].pktlen = 1
  627. else: #pragma: no cover
  628. raise RuntimeError('cmd not found')
  629. # wrap the callback function
  630. cb = lora_comms.process_msgfunc_t(procmsg)
  631. class CCodeSD(MockSyncDatagram):
  632. async def runner(self):
  633. for expectlen in [ 24, 17, 9, 9, 9, 9 ]:
  634. # get message
  635. inmsg = await self.get()
  636. # process the test message
  637. out = lora_comms.comms_process_wrap(
  638. commstate, inmsg)
  639. # make sure the reply matches length
  640. _self.assertEqual(expectlen, len(out))
  641. # save what was originally replied
  642. origmsg = out
  643. # pretend that the reply didn't make it
  644. out = lora_comms.comms_process_wrap(
  645. commstate, inmsg)
  646. # make sure that the reply matches
  647. # the previous
  648. _self.assertEqual(origmsg, out)
  649. # pass the reply back
  650. await self.put(out)
  651. # Generate shared key
  652. shared_key = os.urandom(32)
  653. # Initialize everything
  654. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  655. # Create test fixture
  656. tsd = CCodeSD()
  657. l = LORANode(tsd, shared=shared_key)
  658. # Send various messages
  659. await l.start()
  660. await l.waitfor(30)
  661. await l.runfor(1, 50)
  662. await l.ping()
  663. await l.terminate()
  664. await tsd.drain()
  665. # Make sure all messages have been processed
  666. self.assertTrue(tsd.sendq.empty())
  667. self.assertTrue(tsd.recvq.empty())
  668. # Make sure all the expected messages have been
  669. # processed.
  670. self.assertFalse(exptmsgs)
  671. #_debprint('done')
  672. @timeout(2)
  673. async def test_ccode_newsession(self):
  674. '''This test is to make sure that if an existing session
  675. is running, that a new session can be established, and that
  676. when it does, the old session becomes inactive.
  677. '''
  678. _self = self
  679. from ctypes import c_uint8
  680. seq = AsyncSequence()
  681. # seed the RNG
  682. prngseed = b'abc123'
  683. lora_comms.strobe_seed_prng((c_uint8 *
  684. len(prngseed))(*prngseed), len(prngseed))
  685. # Create the state for testing
  686. commstate = lora_comms.CommsState()
  687. # These are the expected messages and their arguments
  688. exptmsgs = [
  689. (CMD_WAITFOR, [ 30 ]),
  690. (CMD_WAITFOR, [ 70 ]),
  691. (CMD_WAITFOR, [ 40 ]),
  692. (CMD_TERMINATE, [ ]),
  693. ]
  694. def procmsg(msg, outbuf):
  695. msgbuf = msg._from()
  696. cmd = msgbuf[0]
  697. args = [ int.from_bytes(msgbuf[x:x + 4],
  698. byteorder='little') for x in range(1, len(msgbuf),
  699. 4) ]
  700. if exptmsgs[0] == (cmd, args):
  701. exptmsgs.pop(0)
  702. outbuf[0].pkt[0] = cmd
  703. outbuf[0].pktlen = 1
  704. else: #pragma: no cover
  705. raise RuntimeError('cmd not found: %d' % cmd)
  706. # wrap the callback function
  707. cb = lora_comms.process_msgfunc_t(procmsg)
  708. class FlipMsg(object):
  709. async def flipmsg(self):
  710. # get message
  711. inmsg = await self.get()
  712. # process the test message
  713. out = lora_comms.comms_process_wrap(
  714. commstate, inmsg)
  715. # pass the reply back
  716. await self.put(out)
  717. # this class always passes messages, this is
  718. # used for the first session.
  719. class CCodeSD1(MockSyncDatagram, FlipMsg):
  720. async def runner(self):
  721. for i in range(3):
  722. await self.flipmsg()
  723. async with seq.sync(0):
  724. # create bogus message
  725. inmsg = b'0'*24
  726. # process the bogus message
  727. out = lora_comms.comms_process_wrap(
  728. commstate, inmsg)
  729. # make sure there was not a response
  730. _self.assertFalse(out)
  731. await self.flipmsg()
  732. # this one is special in that it will pause after the first
  733. # message to ensure that the previous session will continue
  734. # to work, AND that if a new "new" session comes along, it
  735. # will override the previous new session that hasn't been
  736. # confirmed yet.
  737. class CCodeSD2(MockSyncDatagram, FlipMsg):
  738. async def runner(self):
  739. # pass one message from the new session
  740. async with seq.sync(1):
  741. # There might be a missing case
  742. # handled for when the confirmed
  743. # message is generated, but lost.
  744. await self.flipmsg()
  745. # and the old session is still active
  746. await l.waitfor(70)
  747. async with seq.sync(2):
  748. for i in range(3):
  749. await self.flipmsg()
  750. # Generate shared key
  751. shared_key = os.urandom(32)
  752. # Initialize everything
  753. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  754. # Create test fixture
  755. tsd = CCodeSD1()
  756. l = LORANode(tsd, shared=shared_key)
  757. # Send various messages
  758. await l.start()
  759. await l.waitfor(30)
  760. # Ensure that a new one can take over
  761. tsd2 = CCodeSD2()
  762. l2 = LORANode(tsd2, shared=shared_key)
  763. # Send various messages
  764. await l2.start()
  765. await l2.waitfor(40)
  766. await l2.terminate()
  767. await tsd.drain()
  768. await tsd2.drain()
  769. # Make sure all messages have been processed
  770. self.assertTrue(tsd.sendq.empty())
  771. self.assertTrue(tsd.recvq.empty())
  772. self.assertTrue(tsd2.sendq.empty())
  773. self.assertTrue(tsd2.recvq.empty())
  774. # Make sure all the expected messages have been
  775. # processed.
  776. self.assertFalse(exptmsgs)
  777. class TestLoRaNodeMulticast(unittest.IsolatedAsyncioTestCase):
  778. # see: https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml#multicast-addresses-1
  779. maddr = ('224.0.0.198', 48542)
  780. @timeout(2)
  781. async def test_multisyncdgram(self):
  782. # Test the implementation of the multicast version of
  783. # SyncDatagram
  784. _self = self
  785. from ctypes import c_uint8
  786. # seed the RNG
  787. prngseed = b'abc123'
  788. lora_comms.strobe_seed_prng((c_uint8 *
  789. len(prngseed))(*prngseed), len(prngseed))
  790. # Create the state for testing
  791. commstate = lora_comms.CommsState()
  792. # These are the expected messages and their arguments
  793. exptmsgs = [
  794. (CMD_WAITFOR, [ 30 ]),
  795. (CMD_PING, [ ]),
  796. (CMD_TERMINATE, [ ]),
  797. ]
  798. def procmsg(msg, outbuf):
  799. msgbuf = msg._from()
  800. cmd = msgbuf[0]
  801. args = [ int.from_bytes(msgbuf[x:x + 4],
  802. byteorder='little') for x in range(1, len(msgbuf),
  803. 4) ]
  804. if exptmsgs[0] == (cmd, args):
  805. exptmsgs.pop(0)
  806. outbuf[0].pkt[0] = cmd
  807. outbuf[0].pktlen = 1
  808. else: #pragma: no cover
  809. raise RuntimeError('cmd not found')
  810. # wrap the callback function
  811. cb = lora_comms.process_msgfunc_t(procmsg)
  812. # Generate shared key
  813. shared_key = os.urandom(32)
  814. # Initialize everything
  815. lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  816. # create the object we are testing
  817. msd = MulticastSyncDatagram(self.maddr)
  818. seq = AsyncSequence()
  819. async def clienttask():
  820. mr = await multicast.create_multicast_receiver(
  821. self.maddr)
  822. mt = await multicast.create_multicast_transmitter(
  823. self.maddr)
  824. try:
  825. # make sure the above threads are running
  826. await seq.simpsync(0)
  827. while True:
  828. pkt = await mr.recv()
  829. msg = pkt[0]
  830. out = lora_comms.comms_process_wrap(
  831. commstate, msg)
  832. if out:
  833. await mt.send(out)
  834. finally:
  835. mr.close()
  836. mt.close()
  837. task = asyncio.create_task(clienttask())
  838. # start it
  839. await msd.start()
  840. # pass it to a node
  841. l = LORANode(msd, shared=shared_key)
  842. await seq.simpsync(1)
  843. # Send various messages
  844. await l.start()
  845. await l.waitfor(30)
  846. await l.ping()
  847. await l.terminate()
  848. # shut things down
  849. ln = None
  850. msd.close()
  851. task.cancel()
  852. with self.assertRaises(asyncio.CancelledError):
  853. await task