import argparse |
import asyncio |
import multicast |
import sys |
import unittest |
from unittest.mock import patch, AsyncMock, Mock, call |
from lora import timeout, _debprint |
DEFAULT_MADDR = ('', 21089) |
async def to_loragw(rp, wtr, ignpkts): |
'''Take a multicast.ReceiverProtocol, and pass the packets |
to the lora gateway on the StreamWriter. |
Any packets in the set ignpkts will NOT be sent out to the |
gateway. This is to prevent looping packets back out. |
''' |
while True: |
pkt, addr = await rp.recv() |
#_debprint('pkt to send:', repr(pkt), repr(ignpkts)) |
if pkt in ignpkts: |
ignpkts.remove(pkt) |
continue |
wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii')) |
await wtr.drain() |
async def from_loragw(rdr, tp, txpkts): |
'''Take a StreamReader, and pass the received packets from |
the lora gatway to the multicast.TransmitterProtocol. |
Each packet that will be transmitted will be added the the txpkts |
set that is passed in. This is to allow a receiver to ignore |
the loop back. |
''' |
while True: |
rcv = await rdr.readuntil() |
#_debprint('from gw:', repr(rcv), repr(txpkts)) |
rcv = rcv.strip() |
if rcv.startswith(b'data:'): |
# we've received a packet |
data = bytes.fromhex(rcv[5:].decode('ascii')) |
txpkts.add(data) |
await tp.send(data) |
async def open_dev(fname, *args, **kwargs): |
'''coroutine that returns (reader, writer), in the same way as |
[open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection) |
does. The args are passed to open.''' |
import functools, os, socket |
f = open(fname, *args, **kwargs) |
f.type = socket.SOCK_STREAM |
f.setblocking = functools.partial(os.set_blocking, f.fileno()) |
f.getsockname = lambda: f.name |
f.getpeername = lambda: f.name |
f.family = socket.AF_UNIX |
f.recv = f.read |
f.send = f.write |
return await asyncio.open_connection(sock=f) |
async def main(): |
parser = argparse.ArgumentParser() |
parser.add_argument('-a', metavar='maddr', type=str, |
help='multicastip:port to use to send/receive pkts') |
parser.add_argument('serdev', type=str, |
help='device for gateway comms') |
args = parser.parse_args() |
# open up the gateway device |
reader, writer = await open_dev(args.serdev, 'w+b') |
# open up the listener |
mr = await multicast.create_multicast_receiver(DEFAULT_MADDR) |
mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR) |
try: |
pkts = set() |
tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts)) |
flgtask = asyncio.create_task(from_loragw(reader, mt, pkts)) |
await asyncio.gather(tlgtask, flgtask) |
finally: |
mt.close() |
mr.close() |
writer.close() |
if __name__ == '__main__': |
asyncio.run(main()) |
class TestLoraServ(unittest.IsolatedAsyncioTestCase): |
@timeout(2) |
async def test_from_loragw(self): |
readermock = AsyncMock() |
pkts = [ b'astringofdata', |
b'anotherpkt', |
b'makeupdata', |
b'asigo', |
] |
pktset = set() |
readermock.readuntil.side_effect = [ |
b'bogus data\r\n', |
] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts |
] + [ |
b'moreignored\r\n', |
b'rssi: 123\r\n', |
b'txdone\r\n', |
asyncio.IncompleteReadError(partial=b'aa', |
expected=b'\n'), |
] |
writermock = AsyncMock() |
with self.assertRaises(asyncio.IncompleteReadError): |
await from_loragw(readermock, writermock, pktset) |
writermock.send.assert_has_calls([ call(x) for x in pkts ]) |
self.assertEqual(pktset, set(pkts)) |
@timeout(2) |
async def test_to_loragw(self): |
readermock = AsyncMock() |
writermock = AsyncMock() |
pkts = [ (x, None) for x in (b'astringofdata', |
b'anotherpkt', |
b'makeupdata', |
b'asigo', ) |
] + [ |
asyncio.CancelledError(), |
] |
readermock.recv.side_effect = pkts |
writermock.write = Mock() |
txpkts = { pkts[-2][0] } |
with self.assertRaises(asyncio.CancelledError): |
await to_loragw(readermock, writermock, txpkts) |
# make sure that the ignored packet was dropped |
self.assertFalse(txpkts) |
# and that it wasn't transmitted |
self.assertNotIn(call(b'pkt:%s\n' % |
pkts[-2][0].hex().encode('ascii')), |
writermock.write.mock_calls) |
writermock.write.assert_has_calls([ call(b'pkt:%s\n' % |
x.hex().encode('ascii')) for x, addr in pkts[:-2] ]) |
writermock.drain.assert_has_calls([ call() for x in pkts[:-2] |
]) |
@timeout(2) |
async def test_argerrors(self): |
# it'd be nice to silence the usage output here |
with self.assertRaises(SystemExit) as cm, \ |
patch.dict(sys.__dict__, dict(argv=[ 'name', ])): |
await main() |
self.assertEqual(cm.exception.code, 2) |
@timeout(2) |
@patch(__name__ + '.from_loragw') |
@patch(__name__ + '.to_loragw') |
@patch('multicast.create_multicast_receiver') |
@patch('multicast.create_multicast_transmitter') |
@patch(__name__ + '.open_dev') |
async def test_main(self, od, cmt, cmr, tlg, flg): |
# setup various mocks |
cmtret = Mock() |
cmrret = Mock() |
cmt.return_value = cmtret |
cmr.return_value = cmrret |
readermock = Mock() |
writermock = Mock() |
od.return_value = (readermock, writermock) |
# make sure that when called w/ an arg |
serdev = 'abc123' |
with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])): |
await main() |
# that open_dev is called with it |
od.assert_called_with(serdev, 'w+b') |
# and that the multicast functions were called |
cmt.assert_called_with(DEFAULT_MADDR) |
cmr.assert_called_with(DEFAULT_MADDR) |
# that there was a setobj created |
setobj = tlg.mock_calls[0][1][-1] |
self.assertIsInstance(setobj, set) |
# and the same object was passed |
self.assertIs(setobj, flg.mock_calls[0][1][-1]) |
# and both tasks were passed the correct objects |
tlg.assert_called_with(cmrret, writermock, setobj) |
flg.assert_called_with(readermock, cmtret, setobj) |
# and that they were closed in the end |
cmtret.close.assert_called() |
cmrret.close.assert_called() |
# and that the writer was closed as well |
writermock.close.assert_called() |