|
- import asyncio
- import functools
- import os
- import unittest
-
- from Strobe.Strobe import Strobe
- from Strobe.Strobe import AuthenticationFailed
-
- domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
-
- # Response to command will be the CMD and any arguments if needed.
- # The command is encoded as an unsigned byte
- CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
-
- # The follow commands are queue up, but will be acknoledged when queued
- CMD_WAITFOR = 2 # arg: (length): waits for length seconds
- CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
-
- class LORANode(object):
- '''Implement a LORANode initiator.'''
-
- def __init__(self, syncdatagram):
- self.sd = syncdatagram
- self.st = Strobe(domain)
-
- async def start(self):
- msg = self.st.send_enc(os.urandom(16) + b'reqreset') + \
- self.st.send_mac(8)
-
- resp = await self.sd.sendtillrecv(msg, 1)
-
- self.st.recv_enc(resp[:16])
- self.st.recv_mac(resp[16:])
-
- resp = await self.sd.sendtillrecv(
- self.st.send_enc(b'confirm') + self.st.send_mac(8), 1)
-
- pkt = self.st.recv_enc(resp[:9])
- self.st.recv_mac(resp[9:])
-
- if pkt != b'confirmed':
- raise RuntimeError
-
- @staticmethod
- def _encodeargs(*args):
- r = []
- for i in args:
- r.append(i.to_bytes(4, byteorder='little'))
-
- return b''.join(r)
-
- async def _sendcmd(self, cmd, *args):
- cmdbyte = cmd.to_bytes(1, byteorder='little')
- pkt = await self.sd.sendtillrecv(
- self.st.send_enc(cmdbyte +
- self._encodeargs(*args)) + self.st.send_mac(8), 1)
-
- resp = self.st.recv_enc(pkt[:-8])
- self.st.recv_mac(pkt[-8:])
-
- if resp[0:1] != cmdbyte:
- raise RuntimeError('response does not match, got: %s, expected: %s' % (repr(resp[0:1]), repr(cmdbyte)))
-
- async def waitfor(self, length):
- return await self._sendcmd(CMD_WAITFOR, length)
-
- async def runfor(self, chan, length):
- return await self._sendcmd(CMD_RUNFOR, chan, length)
-
- async def terminate(self):
- return await self._sendcmd(CMD_TERMINATE)
-
- class SyncDatagram(object):
- '''Base interface for a more simple synchronous interface.'''
-
- def __init__(self): #pragma: no cover
- pass
-
- async def recv(self, timeout=None): #pragma: no cover
- '''Receive a datagram. If timeout is not None, wait that many
- seconds, and if nothing is received in that time, raise an TimeoutError
- exception.'''
-
- raise NotImplementedError
-
- async def send(self, data): #pragma: no cover
- '''Send a datagram.'''
-
- raise NotImplementedError
-
- async def sendtillrecv(self, data, freq):
- '''Send the datagram in data, every freq seconds until a datagram
- is received. If timeout seconds happen w/o receiving a datagram,
- then raise an TimeoutError exception.'''
-
- while True:
- await self.send(data)
- try:
- return await self.recv(freq)
- except TimeoutError:
- pass
-
- class MockSyncDatagram(SyncDatagram):
- '''A testing version of SyncDatagram. Define a method runner which
- implements part of the sequence. In the function, await on either
- self.get, to wait for the other side to send something, or await
- self.put w/ data to send.'''
-
- def __init__(self):
- self.sendq = asyncio.Queue()
- self.recvq = asyncio.Queue()
- self.task = None
- self.task = asyncio.create_task(self.runner())
-
- self.get = self.sendq.get
- self.put = self.recvq.put
-
- async def drain(self):
- '''Wait for the runner thread to finish up.'''
-
- return await self.task
-
- async def runner(self): #pragma: no cover
- raise NotImplementedError
-
- async def recv(self, timeout=None):
- return await self.recvq.get()
-
- async def send(self, data):
- return await self.sendq.put(data)
-
- def __del__(self): #pragma: no cover
- if self.task is not None and not self.task.done():
- self.task.cancel()
-
- class TestSyncData(unittest.IsolatedAsyncioTestCase):
- async def test_syncsendtillrecv(self):
- class MySync(SyncDatagram):
- def __init__(self):
- self.sendq = []
- self.resp = [ TimeoutError(), b'a' ]
-
- async def recv(self, timeout=None):
- assert timeout == 1
- r = self.resp.pop(0)
- if isinstance(r, Exception):
- raise r
-
- return r
-
- async def send(self, data):
- self.sendq.append(data)
-
- ms = MySync()
-
- r = await ms.sendtillrecv(b'foo', 1)
-
- self.assertEqual(r, b'a')
- self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
-
- def timeout(timeout):
- def timeout_wrapper(fun):
- @functools.wraps(fun)
- async def wrapper(*args, **kwargs):
- return await asyncio.wait_for(fun(*args, **kwargs),
- timeout)
-
- return wrapper
-
- return timeout_wrapper
-
- class TestLORANode(unittest.IsolatedAsyncioTestCase):
- @timeout(2)
- async def test_lora(self):
- class TestSD(MockSyncDatagram):
- async def runner(self):
- l = Strobe(domain)
-
- # start handshake
- r = await self.get()
-
- pkt = l.recv_enc(r[:-8])
- l.recv_mac(r[-8:])
-
- assert pkt.endswith(b'reqreset')
-
- await self.put(l.send_enc(os.urandom(16)) +
- l.send_mac(8))
-
- r = await self.get()
- c = l.recv_enc(r[:-8])
- l.recv_mac(r[-8:])
-
- assert c == b'confirm'
-
- await self.put(l.send_enc(b'confirmed') +
- l.send_mac(8))
-
- r = await self.get()
- cmd = l.recv_enc(r[:-8])
- l.recv_mac(r[-8:])
-
- assert cmd[0] == CMD_WAITFOR
- assert int.from_bytes(cmd[1:], byteorder='little') == 30
-
- await self.put(l.send_enc(cmd[0:1]) +
- l.send_mac(8))
-
- r = await self.get()
- cmd = l.recv_enc(r[:-8])
- l.recv_mac(r[-8:])
-
- assert cmd[0] == CMD_RUNFOR
- assert int.from_bytes(cmd[1:5], byteorder='little') == 1
- assert int.from_bytes(cmd[5:], byteorder='little') == 50
-
- await self.put(l.send_enc(cmd[0:1]) +
- l.send_mac(8))
-
- r = await self.get()
- cmd = l.recv_enc(r[:-8])
- l.recv_mac(r[-8:])
-
- assert cmd[0] == CMD_TERMINATE
-
- await self.put(l.send_enc(cmd[0:1]) +
- l.send_mac(8))
-
- tsd = TestSD()
- l = LORANode(tsd)
-
- await l.start()
-
- await l.waitfor(30)
-
- await l.runfor(1, 50)
-
- await l.terminate()
-
- await tsd.drain()
-
- # Make sure all messages have been processed
- self.assertTrue(tsd.sendq.empty())
- self.assertTrue(tsd.recvq.empty())
-
- print('done')
|