From 56897ac5b94daae8fdf30ca683077190e8c22ffb Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Tue, 4 May 2021 01:47:09 -0700 Subject: [PATCH] add a server/gateway between multicast and the lora TX/RX... --- Makefile | 4 +- loraserv.py | 222 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 loraserv.py diff --git a/Makefile b/Makefile index f855762..f2a8466 100644 --- a/Makefile +++ b/Makefile @@ -168,8 +168,8 @@ runbuild: $(SRCS) for i in $(.MAKEFILE_LIST) $(.ALLSRC) $$(cat $(DEPENDS) | gsed ':x; /\\$$/ { N; s/\\\n//; tx }' | sed -e 's/^[^:]*://'); do if [ "$$i" != ".." ]; then echo $$i; fi; done | entr -d sh -c 'echo starting...; cd $(.CURDIR) && $(MAKE) $(.MAKEFLAGS) depend && $(MAKE) $(.MAKEFLAGS) all' .PHONY: runtests -runtests: Makefile lora_comms.py lora.py multicast.py $(LIBLORA_TEST) $(LIBLORA_TEST_SRCS) - ls $(.ALLSRC) | entr sh -c '(cd $(.CURDIR) && $(MAKE) $(.MAKEFLAGS) $(LIBLORA_TEST)) && ((PYTHONPATH="$(.CURDIR)" python -m coverage run -m unittest lora multicast && coverage report --omit=$(.CURDIR)/p/\* -m -i) 2>&1 | head -n 30)' +runtests: Makefile lora_comms.py lora.py loraserv.py multicast.py $(LIBLORA_TEST) $(LIBLORA_TEST_SRCS) + ls $(.ALLSRC) | entr sh -c '(cd $(.CURDIR) && $(MAKE) $(.MAKEFLAGS) $(LIBLORA_TEST)) && ((PYTHONPATH="$(.CURDIR)" python -m coverage run -m unittest lora multicast loraserv && coverage report --omit=$(.CURDIR)/p/\* -m -i) 2>&1 | head -n 30)' # native objects .SUFFIXES: .no diff --git a/loraserv.py b/loraserv.py new file mode 100644 index 0000000..38c3747 --- /dev/null +++ b/loraserv.py @@ -0,0 +1,222 @@ +import argparse +import asyncio +import multicast +import sys +import unittest + +from unittest.mock import patch, AsyncMock, Mock, call +from lora import timeout, _debprint + +DEFAULT_MADDR = ('239.192.76.111', 21089) + +async def to_loragw(rp, wtr, ignpkts): + '''Take a multicast.ReceiverProtocol, and pass the packets + to the lora gateway on the StreamWriter. + + Any packets in the set ignpkts will NOT be sent out to the + gateway. This is to prevent looping packets back out. + ''' + + while True: + pkt, addr = await rp.recv() + + #_debprint('pkt to send:', repr(pkt), repr(ignpkts)) + if pkt in ignpkts: + ignpkts.remove(pkt) + continue + + wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii')) + await wtr.drain() + +async def from_loragw(rdr, tp, txpkts): + '''Take a StreamReader, and pass the received packets from + the lora gatway to the multicast.TransmitterProtocol. + + Each packet that will be transmitted will be added the the txpkts + set that is passed in. This is to allow a receiver to ignore + the loop back. + ''' + + while True: + rcv = await rdr.readuntil() + #_debprint('from gw:', repr(rcv), repr(txpkts)) + rcv = rcv.strip() + + if rcv.startswith(b'data:'): + # we've received a packet + data = bytes.fromhex(rcv[5:].decode('ascii')) + txpkts.add(data) + await tp.send(data) + +async def open_dev(fname, *args, **kwargs): + '''coroutine that returns (reader, writer), in the same way as + [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection) + does. The args are passed to open.''' + + import functools, os, socket + + f = open(fname, *args, **kwargs) + + f.type = socket.SOCK_STREAM + f.setblocking = functools.partial(os.set_blocking, f.fileno()) + f.getsockname = lambda: f.name + f.getpeername = lambda: f.name + f.family = socket.AF_UNIX + f.recv = f.read + f.send = f.write + + return await asyncio.open_connection(sock=f) + +async def main(): + parser = argparse.ArgumentParser() + + parser.add_argument('-a', metavar='maddr', type=str, + help='multicastip:port to use to send/receive pkts') + parser.add_argument('serdev', type=str, + help='device for gateway comms') + + args = parser.parse_args() + + # open up the gateway device + reader, writer = await open_dev(args.serdev, 'w+b') + + # open up the listener + mr = await multicast.create_multicast_receiver(DEFAULT_MADDR) + mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR) + + try: + pkts = set() + tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts)) + flgtask = asyncio.create_task(from_loragw(reader, mt, pkts)) + + await asyncio.gather(tlgtask, flgtask) + + finally: + mt.close() + mr.close() + writer.close() + +if __name__ == '__main__': + asyncio.run(main()) + +class TestLoraServ(unittest.IsolatedAsyncioTestCase): + @timeout(2) + async def test_from_loragw(self): + readermock = AsyncMock() + + pkts = [ b'astringofdata', + b'anotherpkt', + b'makeupdata', + b'asigo', + ] + + pktset = set() + + readermock.readuntil.side_effect = [ + b'bogus data\r\n', + ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts + ] + [ + b'moreignored\r\n', + b'rssi: 123\r\n', + b'txdone\r\n', + asyncio.IncompleteReadError(partial=b'aa', + expected=b'\n'), + ] + + + writermock = AsyncMock() + + with self.assertRaises(asyncio.IncompleteReadError): + await from_loragw(readermock, writermock, pktset) + + writermock.send.assert_has_calls([ call(x) for x in pkts ]) + + self.assertEqual(pktset, set(pkts)) + @timeout(2) + async def test_to_loragw(self): + readermock = AsyncMock() + writermock = AsyncMock() + + pkts = [ (x, None) for x in (b'astringofdata', + b'anotherpkt', + b'makeupdata', + b'asigo', ) + ] + [ + asyncio.CancelledError(), + ] + + readermock.recv.side_effect = pkts + writermock.write = Mock() + + txpkts = { pkts[-2][0] } + + with self.assertRaises(asyncio.CancelledError): + await to_loragw(readermock, writermock, txpkts) + + # make sure that the ignored packet was dropped + self.assertFalse(txpkts) + + # and that it wasn't transmitted + self.assertNotIn(call(b'pkt:%s\n' % + pkts[-2][0].hex().encode('ascii')), + writermock.write.mock_calls) + + writermock.write.assert_has_calls([ call(b'pkt:%s\n' % + x.hex().encode('ascii')) for x, addr in pkts[:-2] ]) + writermock.drain.assert_has_calls([ call() for x in pkts[:-2] + ]) + + @timeout(2) + async def test_argerrors(self): + # it'd be nice to silence the usage output here + with self.assertRaises(SystemExit) as cm, \ + patch.dict(sys.__dict__, dict(argv=[ 'name', ])): + await main() + + self.assertEqual(cm.exception.code, 2) + + @timeout(2) + @patch(__name__ + '.from_loragw') + @patch(__name__ + '.to_loragw') + @patch('multicast.create_multicast_receiver') + @patch('multicast.create_multicast_transmitter') + @patch(__name__ + '.open_dev') + async def test_main(self, od, cmt, cmr, tlg, flg): + # setup various mocks + cmtret = Mock() + cmrret = Mock() + cmt.return_value = cmtret + cmr.return_value = cmrret + readermock = Mock() + writermock = Mock() + od.return_value = (readermock, writermock) + + # make sure that when called w/ an arg + serdev = 'abc123' + with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])): + await main() + + # that open_dev is called with it + od.assert_called_with(serdev, 'w+b') + + # and that the multicast functions were called + cmt.assert_called_with(DEFAULT_MADDR) + cmr.assert_called_with(DEFAULT_MADDR) + + # that there was a setobj created + setobj = tlg.mock_calls[0][1][-1] + self.assertIsInstance(setobj, set) + + # and the same object was passed + self.assertIs(setobj, flg.mock_calls[0][1][-1]) + + # and both tasks were passed the correct objects + tlg.assert_called_with(cmrret, writermock, setobj) + flg.assert_called_with(readermock, cmtret, setobj) + + # and that they were closed in the end + cmtret.close.assert_called() + cmrret.close.assert_called() + + # and that the writer was closed as well + writermock.close.assert_called()