Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1433 lines
34 KiB

  1. # Copyright 2021 John-Mark Gurney.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions
  5. # are met:
  6. # 1. Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # 2. Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. #
  12. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  13. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  14. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  15. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  16. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  17. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  18. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  19. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  20. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  21. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  22. # SUCH DAMAGE.
  23. #
  24. import asyncio
  25. import contextlib
  26. import functools
  27. import itertools
  28. import os
  29. import sys
  30. import unittest
  31. from Strobe.Strobe import Strobe, KeccakF
  32. from Strobe.Strobe import AuthenticationFailed
  33. import syote_comms
  34. from syote_comms import make_pktbuf, X25519
  35. import multicast
  36. from util import *
  37. # Response to command will be the CMD and any arguments if needed.
  38. # The command is encoded as an unsigned byte
  39. CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
  40. # The follow commands are queue up, but will be acknoledged when queued
  41. CMD_WAITFOR = 2 # arg: (length): waits for length seconds
  42. CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
  43. CMD_PING = 4 # arg: (): a no op command
  44. CMD_SETUNSET = 5 # arg: (chan, val): sets chan to val
  45. CMD_ADV = 6 # arg: ([cnt]): advances to the next cnt (default 1) command
  46. CMD_CLEAR = 7 # arg: (): clears all future commands, but keeps current running
  47. class LORANode(object):
  48. '''Implement a LORANode initiator.
  49. There are currently two implemented modes, one is shared, and then
  50. a shared key must be provided to the shared keyword argument.
  51. The other is ecdhe mode, which requires an X25519 key to be passed
  52. in to init_key, and the respondent's public key to be passed in to
  53. resp_pub.
  54. '''
  55. SHARED_DOMAIN = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  56. ECDHE_DOMAIN = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'
  57. MAC_LEN = 8
  58. def __init__(self, syncdatagram, shared=None, init_key=None, resp_pub=None):
  59. self.sd = syncdatagram
  60. if shared is not None:
  61. self.st = Strobe(self.SHARED_DOMAIN, F=KeccakF(800))
  62. self.st.key(shared)
  63. self.start = self.shared_start
  64. elif init_key is not None and resp_pub is not None:
  65. self.st = Strobe(self.ECDHE_DOMAIN, F=KeccakF(800))
  66. self.key = init_key
  67. self.resp_pub = resp_pub
  68. self.st.key(init_key.getpub() + resp_pub)
  69. self.start = self.ecdhe_start
  70. else:
  71. raise RuntimeError('invalid combination of keys provided')
  72. async def shared_start(self):
  73. resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')
  74. self.st.ratchet()
  75. pkt = await self.sendrecvvalid(b'confirm')
  76. if pkt != b'confirmed':
  77. raise RuntimeError('got invalid response: %s' %
  78. repr(pkt))
  79. async def ecdhe_start(self):
  80. ephkey = X25519.gen()
  81. resp = await self.sendrecvvalid(ephkey.getpub() + b'reqreset',
  82. fun=lambda: self.st.key(ephkey.dh(self.resp_pub) + self.key.dh(self.resp_pub)))
  83. self.st.key(ephkey.dh(resp) + self.key.dh(resp))
  84. pkt = await self.sendrecvvalid(b'confirm')
  85. if pkt != b'confirmed':
  86. raise RuntimeError('got invalid response: %s' %
  87. repr(pkt))
  88. async def sendrecvvalid(self, msg, fun=None):
  89. msg = self.st.send_enc(msg) + self.st.send_mac(self.MAC_LEN)
  90. if fun is not None:
  91. fun()
  92. origstate = self.st.copy()
  93. while True:
  94. resp = await self.sd.sendtillrecv(msg, .50)
  95. #_debprint('got:', resp)
  96. # skip empty messages
  97. if len(resp) == 0:
  98. continue
  99. try:
  100. decmsg = self.st.recv_enc(resp[:-self.MAC_LEN])
  101. self.st.recv_mac(resp[-self.MAC_LEN:])
  102. break
  103. except AuthenticationFailed:
  104. # didn't get a valid packet, restore
  105. # state and retry
  106. #_debprint('failed')
  107. self.st.set_state_from(origstate)
  108. #_debprint('got rep:', repr(resp), repr(decmsg))
  109. return decmsg
  110. @staticmethod
  111. def _encodeargs(*args):
  112. r = []
  113. for i in args:
  114. r.append(i.to_bytes(4, byteorder='little'))
  115. return b''.join(r)
  116. async def _sendcmd(self, cmd, *args):
  117. cmdbyte = cmd.to_bytes(1, byteorder='little')
  118. resp = await self.sendrecvvalid(cmdbyte + self._encodeargs(*args))
  119. if resp[0:1] != cmdbyte:
  120. raise RuntimeError(
  121. 'response does not match, got: %s, expected: %s' %
  122. (repr(resp[0:1]), repr(cmdbyte)))
  123. async def waitfor(self, length):
  124. return await self._sendcmd(CMD_WAITFOR, length)
  125. async def runfor(self, chan, length):
  126. return await self._sendcmd(CMD_RUNFOR, chan, length)
  127. async def setunset(self, chan, val):
  128. return await self._sendcmd(CMD_SETUNSET, chan, val)
  129. async def ping(self):
  130. return await self._sendcmd(CMD_PING)
  131. async def adv(self, cnt=None):
  132. args = ()
  133. if cnt is not None:
  134. args = (cnt, )
  135. return await self._sendcmd(CMD_ADV, *args)
  136. async def clear(self):
  137. return await self._sendcmd(CMD_CLEAR)
  138. async def terminate(self):
  139. return await self._sendcmd(CMD_TERMINATE)
  140. class SyncDatagram(object):
  141. '''Base interface for a more simple synchronous interface.'''
  142. async def recv(self, timeout=None): #pragma: no cover
  143. '''Receive a datagram. If timeout is not None, wait that many
  144. seconds, and if nothing is received in that time, raise an
  145. asyncio.TimeoutError exception.'''
  146. raise NotImplementedError
  147. async def send(self, data): #pragma: no cover
  148. raise NotImplementedError
  149. async def sendtillrecv(self, data, freq):
  150. '''Send the datagram in data, every freq seconds until a datagram
  151. is received. If timeout seconds happen w/o receiving a datagram,
  152. then raise an TimeoutError exception.'''
  153. while True:
  154. #_debprint('sending:', repr(data))
  155. await self.send(data)
  156. try:
  157. return await self.recv(freq)
  158. except asyncio.TimeoutError:
  159. pass
  160. class MulticastSyncDatagram(SyncDatagram):
  161. '''
  162. An implementation of SyncDatagram that uses the provided
  163. multicast address maddr as the source/sink of the packets.
  164. Note that once created, the start coroutine needs to be
  165. await'd before being passed to a LORANode so that everything
  166. is running.
  167. '''
  168. # Note: sent packets will be received. A similar method to
  169. # what was done in multicast.{to,from}_loragw could be done
  170. # here as well, that is passing in a set of packets to not
  171. # pass back up.
  172. def __init__(self, maddr):
  173. self.maddr = maddr
  174. self._ignpkts = set()
  175. async def start(self):
  176. self.mr = await multicast.create_multicast_receiver(self.maddr)
  177. self.mt = await multicast.create_multicast_transmitter(
  178. self.maddr)
  179. async def _recv(self):
  180. while True:
  181. pkt = await self.mr.recv()
  182. pkt = pkt[0]
  183. if pkt not in self._ignpkts:
  184. return pkt
  185. self._ignpkts.remove(pkt)
  186. async def recv(self, timeout=None): #pragma: no cover
  187. r = await asyncio.wait_for(self._recv(), timeout=timeout)
  188. return r
  189. async def send(self, data): #pragma: no cover
  190. self._ignpkts.add(bytes(data))
  191. await self.mt.send(data)
  192. def close(self):
  193. '''Shutdown communications.'''
  194. self.mr.close()
  195. self.mr = None
  196. self.mt.close()
  197. self.mt = None
  198. def listsplit(lst, item):
  199. try:
  200. idx = lst.index(item)
  201. except ValueError:
  202. return lst, []
  203. return lst[:idx], lst[idx + 1:]
  204. async def main():
  205. import argparse
  206. from loraserv import DEFAULT_MADDR as maddr
  207. parser = argparse.ArgumentParser()
  208. parser.add_argument('-f', dest='schedfile', metavar='filename', type=str,
  209. help='Use commands from the file. One command per line.')
  210. parser.add_argument('-r', dest='client', metavar='module:function', type=str,
  211. help='Create a respondant instead of sending commands. Commands will be passed to the function.')
  212. parser.add_argument('-s', dest='shared_key', metavar='shared_key', type=str, required=True,
  213. help='The shared key (encoded as UTF-8) to use.')
  214. parser.add_argument('args', metavar='CMD_ARG', type=str, nargs='*',
  215. help='Various commands to send to the device.')
  216. args = parser.parse_args()
  217. shared_key = args.shared_key.encode('utf-8')
  218. if args.client:
  219. # Run a client
  220. mr = await multicast.create_multicast_receiver(maddr)
  221. mt = await multicast.create_multicast_transmitter(maddr)
  222. from ctypes import c_uint8
  223. # seed the RNG
  224. prngseed = os.urandom(64)
  225. syote_comms.strobe_seed_prng((c_uint8 *
  226. len(prngseed))(*prngseed), len(prngseed))
  227. # Create the state for testing
  228. commstate = syote_comms.CommsState()
  229. import util_load
  230. client_func = util_load.load_application(args.client)
  231. def client_call(msg, outbuf):
  232. ret = client_func(msg._from())
  233. if len(ret) > outbuf[0].pktlen:
  234. ret = b'error, too long buffer: %d' % len(ret)
  235. outbuf[0].pktlen = min(len(ret), outbuf[0].pktlen)
  236. for i in range(outbuf[0].pktlen):
  237. outbuf[0].pkt[i] = ret[i]
  238. cb = syote_comms.process_msgfunc_t(client_call)
  239. # Initialize everything
  240. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key))
  241. try:
  242. while True:
  243. pkt = await mr.recv()
  244. msg = pkt[0]
  245. out = syote_comms.comms_process_wrap(
  246. commstate, msg)
  247. if out:
  248. await mt.send(out)
  249. finally:
  250. mr.close()
  251. mt.close()
  252. sys.exit(0)
  253. msd = MulticastSyncDatagram(maddr)
  254. await msd.start()
  255. l = LORANode(msd, shared=shared_key)
  256. await l.start()
  257. valid_cmds = {
  258. 'waitfor', 'setunset', 'runfor', 'ping', 'adv', 'clear',
  259. 'terminate',
  260. }
  261. if args.args and args.schedfile:
  262. parser.error('only one of -f or arguments can be specified.')
  263. if args.args:
  264. cmds = list(args.args)
  265. cmdargs = []
  266. while cmds:
  267. a, cmds = listsplit(cmds, '--')
  268. cmdargs.append(a)
  269. else:
  270. with open(args.schedfile) as fp:
  271. cmdargs = [ x.split() for x in fp.readlines() ]
  272. while cmdargs:
  273. cmd, *args = cmdargs.pop(0)
  274. if cmd not in valid_cmds:
  275. print('invalid command:', repr(cmd))
  276. sys.exit(1)
  277. fun = getattr(l, cmd)
  278. await fun(*(int(x) for x in args))
  279. if __name__ == '__main__':
  280. asyncio.run(main())
  281. class MockSyncDatagram(SyncDatagram):
  282. '''A testing version of SyncDatagram. Define a method runner which
  283. implements part of the sequence. In the function, await on either
  284. self.get, to wait for the other side to send something, or await
  285. self.put w/ data to send.'''
  286. def __init__(self):
  287. self.sendq = asyncio.Queue()
  288. self.recvq = asyncio.Queue()
  289. self.task = asyncio.create_task(self.runner())
  290. self.get = self.sendq.get
  291. self.put = self.recvq.put
  292. async def drain(self):
  293. '''Wait for the runner thread to finish up.'''
  294. return await self.task
  295. async def runner(self): #pragma: no cover
  296. raise NotImplementedError
  297. async def recv(self, timeout=None):
  298. return await self.recvq.get()
  299. async def send(self, data):
  300. return await self.sendq.put(data)
  301. def __del__(self): #pragma: no cover
  302. if self.task is not None and not self.task.done():
  303. self.task.cancel()
  304. class TestSyncData(unittest.IsolatedAsyncioTestCase):
  305. async def test_syncsendtillrecv(self):
  306. class MySync(SyncDatagram):
  307. def __init__(self):
  308. self.sendq = []
  309. self.resp = [ asyncio.TimeoutError(), b'a' ]
  310. async def recv(self, timeout=None):
  311. assert timeout == 1
  312. r = self.resp.pop(0)
  313. if isinstance(r, Exception):
  314. raise r
  315. return r
  316. async def send(self, data):
  317. self.sendq.append(data)
  318. ms = MySync()
  319. r = await ms.sendtillrecv(b'foo', 1)
  320. self.assertEqual(r, b'a')
  321. self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
  322. class AsyncSequence(object):
  323. '''
  324. Object used for sequencing async functions. To use, use the
  325. asynchronous context manager created by the sync method. For
  326. example:
  327. seq = AsyncSequence()
  328. async func1():
  329. async with seq.sync(1):
  330. second_fun()
  331. async func2():
  332. async with seq.sync(0):
  333. first_fun()
  334. This will make sure that function first_fun is run before running
  335. the function second_fun. If a previous block raises an Exception,
  336. it will be passed up, and all remaining blocks (and future ones)
  337. will raise a CancelledError to help ensure that any tasks are
  338. properly cleaned up.
  339. '''
  340. def __init__(self, positerfactory=lambda: itertools.count()):
  341. '''The argument positerfactory, is a factory that will
  342. create an iterator that will be used for the values that
  343. are passed to the sync method.'''
  344. self.positer = positerfactory()
  345. self.token = object()
  346. self.die = False
  347. self.waiting = {
  348. next(self.positer): self.token
  349. }
  350. async def simpsync(self, pos):
  351. async with self.sync(pos):
  352. pass
  353. @contextlib.asynccontextmanager
  354. async def sync(self, pos):
  355. '''An async context manager that will be run when it's
  356. turn arrives. It will only run when all the previous
  357. items in the iterator has been successfully run.'''
  358. if self.die:
  359. raise asyncio.CancelledError('seq cancelled')
  360. if pos in self.waiting:
  361. if self.waiting[pos] is not self.token:
  362. raise RuntimeError('pos already waiting!')
  363. else:
  364. fut = asyncio.Future()
  365. self.waiting[pos] = fut
  366. await fut
  367. # our time to shine!
  368. del self.waiting[pos]
  369. try:
  370. yield None
  371. except Exception as e:
  372. # if we got an exception, things went pear shaped,
  373. # shut everything down, and any future calls.
  374. #_debprint('dieing...', repr(e))
  375. self.die = True
  376. # cancel existing blocks
  377. while self.waiting:
  378. k, v = self.waiting.popitem()
  379. #_debprint('canceling: %s' % repr(k))
  380. if v is self.token:
  381. continue
  382. # for Python 3.9:
  383. # msg='pos %s raised exception: %s' %
  384. # (repr(pos), repr(e))
  385. v.cancel()
  386. # populate real exception up
  387. raise
  388. else:
  389. # handle next
  390. nextpos = next(self.positer)
  391. if nextpos in self.waiting:
  392. #_debprint('np:', repr(self), nextpos,
  393. # repr(self.waiting[nextpos]))
  394. self.waiting[nextpos].set_result(None)
  395. else:
  396. self.waiting[nextpos] = self.token
  397. class TestSequencing(unittest.IsolatedAsyncioTestCase):
  398. @timeout(2)
  399. async def test_seq_alreadywaiting(self):
  400. waitseq = AsyncSequence()
  401. seq = AsyncSequence()
  402. async def fun1():
  403. async with waitseq.sync(1):
  404. pass
  405. async def fun2():
  406. async with seq.sync(1):
  407. async with waitseq.sync(1): # pragma: no cover
  408. pass
  409. task1 = asyncio.create_task(fun1())
  410. task2 = asyncio.create_task(fun2())
  411. # spin things to make sure things advance
  412. await asyncio.sleep(0)
  413. async with seq.sync(0):
  414. pass
  415. with self.assertRaises(RuntimeError):
  416. await task2
  417. async with waitseq.sync(0):
  418. pass
  419. await task1
  420. @timeout(2)
  421. async def test_seqexc(self):
  422. seq = AsyncSequence()
  423. excseq = AsyncSequence()
  424. async def excfun1():
  425. async with seq.sync(1):
  426. pass
  427. async with excseq.sync(0):
  428. raise ValueError('foo')
  429. # that a block that enters first, but runs after
  430. # raises an exception
  431. async def excfun2():
  432. async with seq.sync(0):
  433. pass
  434. async with excseq.sync(1): # pragma: no cover
  435. pass
  436. # that a block that enters after, raises an
  437. # exception
  438. async def excfun3():
  439. async with seq.sync(2):
  440. pass
  441. async with excseq.sync(2): # pragma: no cover
  442. pass
  443. task1 = asyncio.create_task(excfun1())
  444. task2 = asyncio.create_task(excfun2())
  445. task3 = asyncio.create_task(excfun3())
  446. with self.assertRaises(ValueError):
  447. await task1
  448. with self.assertRaises(asyncio.CancelledError):
  449. await task2
  450. with self.assertRaises(asyncio.CancelledError):
  451. await task3
  452. @timeout(2)
  453. async def test_seq(self):
  454. # test that a seq object when created
  455. seq = AsyncSequence(lambda: itertools.count(1))
  456. col = []
  457. async def fun1():
  458. async with seq.sync(1):
  459. col.append(1)
  460. async with seq.sync(2):
  461. col.append(2)
  462. async with seq.sync(4):
  463. col.append(4)
  464. async def fun2():
  465. async with seq.sync(3):
  466. col.append(3)
  467. async with seq.sync(6):
  468. col.append(6)
  469. async def fun3():
  470. async with seq.sync(5):
  471. col.append(5)
  472. # and various functions are run
  473. task1 = asyncio.create_task(fun1())
  474. task2 = asyncio.create_task(fun2())
  475. task3 = asyncio.create_task(fun3())
  476. # and the functions complete
  477. await task3
  478. await task2
  479. await task1
  480. # that the order they ran in was correct
  481. self.assertEqual(col, list(range(1, 7)))
  482. class TestLORANode(unittest.IsolatedAsyncioTestCase):
  483. shared_domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  484. ecdhe_domain = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'
  485. def test_initparams(self):
  486. # make sure no keys fails
  487. with self.assertRaises(RuntimeError):
  488. l = LORANode(None)
  489. @timeout(2)
  490. async def test_lora_ecdhe(self):
  491. _self = self
  492. initkey = X25519.gen()
  493. respkey = X25519.gen()
  494. class TestSD(MockSyncDatagram):
  495. async def sendgettest(self, msg):
  496. '''Send the message, but make sure that if a
  497. bad message is sent afterward, that it replies
  498. w/ the same previous message.
  499. '''
  500. await self.put(msg)
  501. resp = await self.get()
  502. await self.put(b'bogusmsg' * 5)
  503. resp2 = await self.get()
  504. _self.assertEqual(resp, resp2)
  505. return resp
  506. async def runner(self):
  507. # as respondant
  508. l = Strobe(_self.ecdhe_domain, F=KeccakF(800))
  509. l.key(initkey.getpub() + respkey.getpub())
  510. # start handshake
  511. r = await self.get()
  512. # get eph key w/ reqreset
  513. pkt = l.recv_enc(r[:-8])
  514. l.recv_mac(r[-8:])
  515. assert pkt.endswith(b'reqreset')
  516. ephpub = pkt[:-len(b'reqreset')]
  517. # make sure junk gets ignored
  518. await self.put(b'sdlfkj')
  519. # and that the packet remains the same
  520. _self.assertEqual(r, await self.get())
  521. # and a couple more times
  522. await self.put(b'0' * 24)
  523. _self.assertEqual(r, await self.get())
  524. await self.put(b'0' * 32)
  525. _self.assertEqual(r, await self.get())
  526. # update the keys
  527. l.key(respkey.dh(ephpub) + respkey.dh(initkey.getpub()))
  528. # generate our eph key
  529. ephkey = X25519.gen()
  530. # send the response
  531. await self.put(l.send_enc(ephkey.getpub()) +
  532. l.send_mac(8))
  533. l.key(ephkey.dh(ephpub) + ephkey.dh(initkey.getpub()))
  534. # get the confirmation message
  535. r = await self.get()
  536. # test the resend capabilities
  537. await self.put(b'0' * 24)
  538. _self.assertEqual(r, await self.get())
  539. # decode confirmation message
  540. c = l.recv_enc(r[:-8])
  541. l.recv_mac(r[-8:])
  542. # assert that we got it
  543. _self.assertEqual(c, b'confirm')
  544. # send confirmed reply
  545. r = await self.sendgettest(l.send_enc(
  546. b'confirmed') + l.send_mac(8))
  547. # test and decode remaining command messages
  548. cmd = l.recv_enc(r[:-8])
  549. l.recv_mac(r[-8:])
  550. assert cmd[0] == CMD_WAITFOR
  551. assert int.from_bytes(cmd[1:],
  552. byteorder='little') == 30
  553. r = await self.sendgettest(l.send_enc(
  554. cmd[0:1]) + l.send_mac(8))
  555. cmd = l.recv_enc(r[:-8])
  556. l.recv_mac(r[-8:])
  557. assert cmd[0] == CMD_RUNFOR
  558. assert int.from_bytes(cmd[1:5],
  559. byteorder='little') == 1
  560. assert int.from_bytes(cmd[5:],
  561. byteorder='little') == 50
  562. r = await self.sendgettest(l.send_enc(
  563. cmd[0:1]) + l.send_mac(8))
  564. cmd = l.recv_enc(r[:-8])
  565. l.recv_mac(r[-8:])
  566. assert cmd[0] == CMD_TERMINATE
  567. await self.put(l.send_enc(cmd[0:1]) +
  568. l.send_mac(8))
  569. tsd = TestSD()
  570. # make sure it fails w/o both specified
  571. with self.assertRaises(RuntimeError):
  572. l = LORANode(tsd, init_key=initkey)
  573. with self.assertRaises(RuntimeError):
  574. l = LORANode(tsd, resp_pub=respkey.getpub())
  575. l = LORANode(tsd, init_key=initkey, resp_pub=respkey.getpub())
  576. await l.start()
  577. await l.waitfor(30)
  578. await l.runfor(1, 50)
  579. await l.terminate()
  580. await tsd.drain()
  581. # Make sure all messages have been processed
  582. self.assertTrue(tsd.sendq.empty())
  583. self.assertTrue(tsd.recvq.empty())
  584. #_debprint('done')
  585. @timeout(2)
  586. async def test_lora_shared(self):
  587. _self = self
  588. shared_key = os.urandom(32)
  589. class TestSD(MockSyncDatagram):
  590. async def sendgettest(self, msg):
  591. '''Send the message, but make sure that if a
  592. bad message is sent afterward, that it replies
  593. w/ the same previous message.
  594. '''
  595. await self.put(msg)
  596. resp = await self.get()
  597. await self.put(b'bogusmsg' * 5)
  598. resp2 = await self.get()
  599. _self.assertEqual(resp, resp2)
  600. return resp
  601. async def runner(self):
  602. l = Strobe(TestLORANode.shared_domain, F=KeccakF(800))
  603. l.key(shared_key)
  604. # start handshake
  605. r = await self.get()
  606. pkt = l.recv_enc(r[:-8])
  607. l.recv_mac(r[-8:])
  608. assert pkt.endswith(b'reqreset')
  609. # make sure junk gets ignored
  610. await self.put(b'sdlfkj')
  611. # and that the packet remains the same
  612. _self.assertEqual(r, await self.get())
  613. # and a couple more times
  614. await self.put(b'0' * 24)
  615. _self.assertEqual(r, await self.get())
  616. await self.put(b'0' * 32)
  617. _self.assertEqual(r, await self.get())
  618. # send the response
  619. await self.put(l.send_enc(os.urandom(16)) +
  620. l.send_mac(8))
  621. # require no more back tracking at this point
  622. l.ratchet()
  623. # get the confirmation message
  624. r = await self.get()
  625. # test the resend capabilities
  626. await self.put(b'0' * 24)
  627. _self.assertEqual(r, await self.get())
  628. # decode confirmation message
  629. c = l.recv_enc(r[:-8])
  630. l.recv_mac(r[-8:])
  631. # assert that we got it
  632. _self.assertEqual(c, b'confirm')
  633. # send confirmed reply
  634. r = await self.sendgettest(l.send_enc(
  635. b'confirmed') + l.send_mac(8))
  636. # test and decode remaining command messages
  637. cmd = l.recv_enc(r[:-8])
  638. l.recv_mac(r[-8:])
  639. assert cmd[0] == CMD_WAITFOR
  640. assert int.from_bytes(cmd[1:],
  641. byteorder='little') == 30
  642. r = await self.sendgettest(l.send_enc(
  643. cmd[0:1]) + l.send_mac(8))
  644. cmd = l.recv_enc(r[:-8])
  645. l.recv_mac(r[-8:])
  646. assert cmd[0] == CMD_RUNFOR
  647. assert int.from_bytes(cmd[1:5],
  648. byteorder='little') == 1
  649. assert int.from_bytes(cmd[5:],
  650. byteorder='little') == 50
  651. r = await self.sendgettest(l.send_enc(
  652. cmd[0:1]) + l.send_mac(8))
  653. cmd = l.recv_enc(r[:-8])
  654. l.recv_mac(r[-8:])
  655. assert cmd[0] == CMD_TERMINATE
  656. await self.put(l.send_enc(cmd[0:1]) +
  657. l.send_mac(8))
  658. tsd = TestSD()
  659. l = LORANode(tsd, shared=shared_key)
  660. await l.start()
  661. await l.waitfor(30)
  662. await l.runfor(1, 50)
  663. await l.terminate()
  664. await tsd.drain()
  665. # Make sure all messages have been processed
  666. self.assertTrue(tsd.sendq.empty())
  667. self.assertTrue(tsd.recvq.empty())
  668. #_debprint('done')
  669. @timeout(2)
  670. async def test_ccode_badmsgs(self):
  671. # Test to make sure that various bad messages in the
  672. # handshake process are rejected even if the attacker
  673. # has the correct key. This just keeps the protocol
  674. # tight allowing for variations in the future.
  675. # seed the RNG
  676. prngseed = b'abc123'
  677. from ctypes import c_uint8
  678. syote_comms.strobe_seed_prng((c_uint8 *
  679. len(prngseed))(*prngseed), len(prngseed))
  680. # Create the state for testing
  681. commstate = syote_comms.CommsState()
  682. cb = syote_comms.process_msgfunc_t(lambda msg, outbuf: None)
  683. # Generate shared key
  684. shared_key = os.urandom(32)
  685. # Initialize everything
  686. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  687. # Create test fixture, only use it to init crypto state
  688. tsd = SyncDatagram()
  689. l = LORANode(tsd, shared=shared_key)
  690. # copy the crypto state
  691. cstate = l.st.copy()
  692. # compose an incorrect init message
  693. msg = os.urandom(16) + b'othre'
  694. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  695. out = syote_comms.comms_process_wrap(commstate, msg)
  696. self.assertFalse(out)
  697. # that varous short messages don't cause problems
  698. for i in range(10):
  699. out = syote_comms.comms_process_wrap(commstate, b'0' * i)
  700. self.assertFalse(out)
  701. # copy the crypto state
  702. cstate = l.st.copy()
  703. # compose an incorrect init message
  704. msg = os.urandom(16) + b' eqreset'
  705. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  706. out = syote_comms.comms_process_wrap(commstate, msg)
  707. self.assertFalse(out)
  708. # compose the correct init message
  709. msg = os.urandom(16) + b'reqreset'
  710. msg = l.st.send_enc(msg) + l.st.send_mac(l.MAC_LEN)
  711. out = syote_comms.comms_process_wrap(commstate, msg)
  712. l.st.recv_enc(out[:-l.MAC_LEN])
  713. l.st.recv_mac(out[-l.MAC_LEN:])
  714. l.st.ratchet()
  715. # copy the crypto state
  716. cstate = l.st.copy()
  717. # compose an incorrect confirmed message
  718. msg = b'onfirm'
  719. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  720. out = syote_comms.comms_process_wrap(commstate, msg)
  721. self.assertFalse(out)
  722. # copy the crypto state
  723. cstate = l.st.copy()
  724. # compose an incorrect confirmed message
  725. msg = b' onfirm'
  726. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  727. out = syote_comms.comms_process_wrap(commstate, msg)
  728. self.assertFalse(out)
  729. @timeout(2)
  730. async def test_ccode_ecdhe(self):
  731. _self = self
  732. from ctypes import c_uint8
  733. # seed the RNG
  734. prngseed = b'abc123'
  735. syote_comms.strobe_seed_prng((c_uint8 *
  736. len(prngseed))(*prngseed), len(prngseed))
  737. # Create the state for testing
  738. commstate = syote_comms.CommsState()
  739. # These are the expected messages and their arguments
  740. exptmsgs = [
  741. (CMD_WAITFOR, [ 30 ]),
  742. (CMD_RUNFOR, [ 1, 50 ]),
  743. (CMD_PING, [ ]),
  744. (CMD_TERMINATE, [ ]),
  745. ]
  746. def procmsg(msg, outbuf):
  747. msgbuf = msg._from()
  748. cmd = msgbuf[0]
  749. args = [ int.from_bytes(msgbuf[x:x + 4],
  750. byteorder='little') for x in range(1, len(msgbuf),
  751. 4) ]
  752. if exptmsgs[0] == (cmd, args):
  753. exptmsgs.pop(0)
  754. outbuf[0].pkt[0] = cmd
  755. outbuf[0].pktlen = 1
  756. else: #pragma: no cover
  757. raise RuntimeError('cmd not found')
  758. # wrap the callback function
  759. cb = syote_comms.process_msgfunc_t(procmsg)
  760. class CCodeSD(MockSyncDatagram):
  761. async def runner(self):
  762. for expectlen in [ 40, 17, 9, 9, 9, 9 ]:
  763. # get message
  764. inmsg = await self.get()
  765. # process the test message
  766. out = syote_comms.comms_process_wrap(
  767. commstate, inmsg)
  768. # make sure the reply matches length
  769. _self.assertEqual(expectlen, len(out))
  770. # save what was originally replied
  771. origmsg = out
  772. # pretend that the reply didn't make it
  773. out = syote_comms.comms_process_wrap(
  774. commstate, inmsg)
  775. # make sure that the reply matches
  776. # the previous
  777. _self.assertEqual(origmsg, out)
  778. # pass the reply back
  779. await self.put(out)
  780. # Generate keys
  781. initkey = X25519.gen()
  782. respkey = X25519.gen()
  783. # Initialize everything
  784. syote_comms.comms_init(commstate, cb, None, make_pktbuf(respkey.getpriv()), make_pktbuf(initkey.getpub()))
  785. # Create test fixture
  786. tsd = CCodeSD()
  787. l = LORANode(tsd, init_key=initkey, resp_pub=respkey.getpub())
  788. # Send various messages
  789. await l.start()
  790. await l.waitfor(30)
  791. await l.runfor(1, 50)
  792. await l.ping()
  793. await l.terminate()
  794. await tsd.drain()
  795. # Make sure all messages have been processed
  796. self.assertTrue(tsd.sendq.empty())
  797. self.assertTrue(tsd.recvq.empty())
  798. # Make sure all the expected messages have been
  799. # processed.
  800. self.assertFalse(exptmsgs)
  801. #_debprint('done')
  802. @timeout(2)
  803. async def test_ccode(self):
  804. _self = self
  805. from ctypes import c_uint8
  806. # seed the RNG
  807. prngseed = b'abc123'
  808. syote_comms.strobe_seed_prng((c_uint8 *
  809. len(prngseed))(*prngseed), len(prngseed))
  810. # Create the state for testing
  811. commstate = syote_comms.CommsState()
  812. # These are the expected messages and their arguments
  813. exptmsgs = [
  814. (CMD_WAITFOR, [ 30 ]),
  815. (CMD_RUNFOR, [ 1, 50 ]),
  816. (CMD_PING, [ ]),
  817. (CMD_TERMINATE, [ ]),
  818. ]
  819. def procmsg(msg, outbuf):
  820. msgbuf = msg._from()
  821. cmd = msgbuf[0]
  822. args = [ int.from_bytes(msgbuf[x:x + 4],
  823. byteorder='little') for x in range(1, len(msgbuf),
  824. 4) ]
  825. if exptmsgs[0] == (cmd, args):
  826. exptmsgs.pop(0)
  827. outbuf[0].pkt[0] = cmd
  828. outbuf[0].pktlen = 1
  829. else: #pragma: no cover
  830. raise RuntimeError('cmd not found')
  831. # wrap the callback function
  832. cb = syote_comms.process_msgfunc_t(procmsg)
  833. class CCodeSD(MockSyncDatagram):
  834. async def runner(self):
  835. for expectlen in [ 24, 17, 9, 9, 9, 9 ]:
  836. # get message
  837. inmsg = await self.get()
  838. # process the test message
  839. out = syote_comms.comms_process_wrap(
  840. commstate, inmsg)
  841. # make sure the reply matches length
  842. _self.assertEqual(expectlen, len(out))
  843. # save what was originally replied
  844. origmsg = out
  845. # pretend that the reply didn't make it
  846. out = syote_comms.comms_process_wrap(
  847. commstate, inmsg)
  848. # make sure that the reply matches
  849. # the previous
  850. _self.assertEqual(origmsg, out)
  851. # pass the reply back
  852. await self.put(out)
  853. # Generate shared key
  854. shared_key = os.urandom(32)
  855. # Initialize everything
  856. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  857. # Create test fixture
  858. tsd = CCodeSD()
  859. l = LORANode(tsd, shared=shared_key)
  860. # Send various messages
  861. await l.start()
  862. await l.waitfor(30)
  863. await l.runfor(1, 50)
  864. await l.ping()
  865. await l.terminate()
  866. await tsd.drain()
  867. # Make sure all messages have been processed
  868. self.assertTrue(tsd.sendq.empty())
  869. self.assertTrue(tsd.recvq.empty())
  870. # Make sure all the expected messages have been
  871. # processed.
  872. self.assertFalse(exptmsgs)
  873. #_debprint('done')
  874. @timeout(2)
  875. async def test_ccode_newsession(self):
  876. '''This test is to make sure that if an existing session
  877. is running, that a new session can be established, and that
  878. when it does, the old session becomes inactive.
  879. '''
  880. _self = self
  881. from ctypes import c_uint8
  882. seq = AsyncSequence()
  883. # seed the RNG
  884. prngseed = b'abc123'
  885. syote_comms.strobe_seed_prng((c_uint8 *
  886. len(prngseed))(*prngseed), len(prngseed))
  887. # Create the state for testing
  888. commstate = syote_comms.CommsState()
  889. # These are the expected messages and their arguments
  890. exptmsgs = [
  891. (CMD_WAITFOR, [ 30 ]),
  892. (CMD_WAITFOR, [ 70 ]),
  893. (CMD_WAITFOR, [ 40 ]),
  894. (CMD_TERMINATE, [ ]),
  895. ]
  896. def procmsg(msg, outbuf):
  897. msgbuf = msg._from()
  898. cmd = msgbuf[0]
  899. args = [ int.from_bytes(msgbuf[x:x + 4],
  900. byteorder='little') for x in range(1, len(msgbuf),
  901. 4) ]
  902. if exptmsgs[0] == (cmd, args):
  903. exptmsgs.pop(0)
  904. outbuf[0].pkt[0] = cmd
  905. outbuf[0].pktlen = 1
  906. else: #pragma: no cover
  907. raise RuntimeError('cmd not found: %d' % cmd)
  908. # wrap the callback function
  909. cb = syote_comms.process_msgfunc_t(procmsg)
  910. class FlipMsg(object):
  911. async def flipmsg(self):
  912. # get message
  913. inmsg = await self.get()
  914. # process the test message
  915. out = syote_comms.comms_process_wrap(
  916. commstate, inmsg)
  917. # pass the reply back
  918. await self.put(out)
  919. # this class always passes messages, this is
  920. # used for the first session.
  921. class CCodeSD1(MockSyncDatagram, FlipMsg):
  922. async def runner(self):
  923. for i in range(3):
  924. await self.flipmsg()
  925. async with seq.sync(0):
  926. # create bogus message
  927. inmsg = b'0'*24
  928. # process the bogus message
  929. out = syote_comms.comms_process_wrap(
  930. commstate, inmsg)
  931. # make sure there was not a response
  932. _self.assertFalse(out)
  933. await self.flipmsg()
  934. # this one is special in that it will pause after the first
  935. # message to ensure that the previous session will continue
  936. # to work, AND that if a new "new" session comes along, it
  937. # will override the previous new session that hasn't been
  938. # confirmed yet.
  939. class CCodeSD2(MockSyncDatagram, FlipMsg):
  940. async def runner(self):
  941. # pass one message from the new session
  942. async with seq.sync(1):
  943. # There might be a missing case
  944. # handled for when the confirmed
  945. # message is generated, but lost.
  946. await self.flipmsg()
  947. # and the old session is still active
  948. await l.waitfor(70)
  949. async with seq.sync(2):
  950. for i in range(3):
  951. await self.flipmsg()
  952. # Generate shared key
  953. shared_key = os.urandom(32)
  954. # Initialize everything
  955. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  956. # Create test fixture
  957. tsd = CCodeSD1()
  958. l = LORANode(tsd, shared=shared_key)
  959. # Send various messages
  960. await l.start()
  961. await l.waitfor(30)
  962. # Ensure that a new one can take over
  963. tsd2 = CCodeSD2()
  964. l2 = LORANode(tsd2, shared=shared_key)
  965. # Send various messages
  966. await l2.start()
  967. await l2.waitfor(40)
  968. await l2.terminate()
  969. await tsd.drain()
  970. await tsd2.drain()
  971. # Make sure all messages have been processed
  972. self.assertTrue(tsd.sendq.empty())
  973. self.assertTrue(tsd.recvq.empty())
  974. self.assertTrue(tsd2.sendq.empty())
  975. self.assertTrue(tsd2.recvq.empty())
  976. # Make sure all the expected messages have been
  977. # processed.
  978. self.assertFalse(exptmsgs)
  979. class TestLoRaNodeMulticast(unittest.IsolatedAsyncioTestCase):
  980. # see: https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml#multicast-addresses-1
  981. maddr = ('224.0.0.198', 48542)
  982. @timeout(2)
  983. async def test_multisyncdgram(self):
  984. # Test the implementation of the multicast version of
  985. # SyncDatagram
  986. _self = self
  987. from ctypes import c_uint8
  988. # seed the RNG
  989. prngseed = b'abc123'
  990. syote_comms.strobe_seed_prng((c_uint8 *
  991. len(prngseed))(*prngseed), len(prngseed))
  992. # Create the state for testing
  993. commstate = syote_comms.CommsState()
  994. # These are the expected messages and their arguments
  995. exptmsgs = [
  996. (CMD_WAITFOR, [ 30 ]),
  997. (CMD_PING, [ ]),
  998. (CMD_TERMINATE, [ ]),
  999. ]
  1000. def procmsg(msg, outbuf):
  1001. msgbuf = msg._from()
  1002. cmd = msgbuf[0]
  1003. args = [ int.from_bytes(msgbuf[x:x + 4],
  1004. byteorder='little') for x in range(1, len(msgbuf),
  1005. 4) ]
  1006. if exptmsgs[0] == (cmd, args):
  1007. exptmsgs.pop(0)
  1008. outbuf[0].pkt[0] = cmd
  1009. outbuf[0].pktlen = 1
  1010. else: #pragma: no cover
  1011. raise RuntimeError('cmd not found')
  1012. # wrap the callback function
  1013. cb = syote_comms.process_msgfunc_t(procmsg)
  1014. # Generate shared key
  1015. shared_key = os.urandom(32)
  1016. # Initialize everything
  1017. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  1018. # create the object we are testing
  1019. msd = MulticastSyncDatagram(self.maddr)
  1020. seq = AsyncSequence()
  1021. async def clienttask():
  1022. mr = await multicast.create_multicast_receiver(
  1023. self.maddr)
  1024. mt = await multicast.create_multicast_transmitter(
  1025. self.maddr)
  1026. try:
  1027. # make sure the above threads are running
  1028. await seq.simpsync(0)
  1029. while True:
  1030. pkt = await mr.recv()
  1031. msg = pkt[0]
  1032. out = syote_comms.comms_process_wrap(
  1033. commstate, msg)
  1034. if out:
  1035. await mt.send(out)
  1036. finally:
  1037. mr.close()
  1038. mt.close()
  1039. task = asyncio.create_task(clienttask())
  1040. # start it
  1041. await msd.start()
  1042. # pass it to a node
  1043. l = LORANode(msd, shared=shared_key)
  1044. await seq.simpsync(1)
  1045. # Send various messages
  1046. await l.start()
  1047. await l.waitfor(30)
  1048. await l.ping()
  1049. await l.terminate()
  1050. # shut things down
  1051. ln = None
  1052. msd.close()
  1053. task.cancel()
  1054. with self.assertRaises(asyncio.CancelledError):
  1055. await task