|
- import argparse
- import asyncio
- import multicast
- import sys
- import unittest
-
- from unittest.mock import patch, AsyncMock, Mock, call
- from lora import timeout, _debprint
-
- DEFAULT_MADDR = ('239.192.76.111', 21089)
-
- async def to_loragw(rp, wtr, ignpkts):
- '''Take a multicast.ReceiverProtocol, and pass the packets
- to the lora gateway on the StreamWriter.
-
- Any packets in the set ignpkts will NOT be sent out to the
- gateway. This is to prevent looping packets back out.
- '''
-
- while True:
- pkt, addr = await rp.recv()
-
- #_debprint('pkt to send:', repr(pkt), repr(ignpkts))
- if pkt in ignpkts:
- ignpkts.remove(pkt)
- continue
-
- wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii'))
- await wtr.drain()
-
- async def from_loragw(rdr, tp, txpkts):
- '''Take a StreamReader, and pass the received packets from
- the lora gatway to the multicast.TransmitterProtocol.
-
- Each packet that will be transmitted will be added the the txpkts
- set that is passed in. This is to allow a receiver to ignore
- the loop back.
- '''
-
- while True:
- rcv = await rdr.readuntil()
- #_debprint('from gw:', repr(rcv), repr(txpkts))
- rcv = rcv.strip()
-
- if rcv.startswith(b'data:'):
- # we've received a packet
- data = bytes.fromhex(rcv[5:].decode('ascii'))
- txpkts.add(data)
- await tp.send(data)
-
- async def open_dev(fname, *args, **kwargs):
- '''coroutine that returns (reader, writer), in the same way as
- [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection)
- does. The args are passed to open.'''
-
- import functools, os, socket
-
- f = open(fname, *args, **kwargs)
-
- f.type = socket.SOCK_STREAM
- f.setblocking = functools.partial(os.set_blocking, f.fileno())
- f.getsockname = lambda: f.name
- f.getpeername = lambda: f.name
- f.family = socket.AF_UNIX
- f.recv = f.read
- f.send = f.write
-
- return await asyncio.open_connection(sock=f)
-
- async def main():
- parser = argparse.ArgumentParser()
-
- parser.add_argument('-a', metavar='maddr', type=str,
- help='multicastip:port to use to send/receive pkts')
- parser.add_argument('serdev', type=str,
- help='device for gateway comms')
-
- args = parser.parse_args()
-
- # open up the gateway device
- reader, writer = await open_dev(args.serdev, 'w+b')
-
- # open up the listener
- mr = await multicast.create_multicast_receiver(DEFAULT_MADDR)
- mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR)
-
- try:
- pkts = set()
- tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts))
- flgtask = asyncio.create_task(from_loragw(reader, mt, pkts))
-
- await asyncio.gather(tlgtask, flgtask)
-
- finally:
- mt.close()
- mr.close()
- writer.close()
-
- if __name__ == '__main__':
- asyncio.run(main())
-
- class TestLoraServ(unittest.IsolatedAsyncioTestCase):
- @timeout(2)
- async def test_from_loragw(self):
- readermock = AsyncMock()
-
- pkts = [ b'astringofdata',
- b'anotherpkt',
- b'makeupdata',
- b'asigo',
- ]
-
- pktset = set()
-
- readermock.readuntil.side_effect = [
- b'bogus data\r\n',
- ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts
- ] + [
- b'moreignored\r\n',
- b'rssi: 123\r\n',
- b'txdone\r\n',
- asyncio.IncompleteReadError(partial=b'aa',
- expected=b'\n'),
- ]
-
-
- writermock = AsyncMock()
-
- with self.assertRaises(asyncio.IncompleteReadError):
- await from_loragw(readermock, writermock, pktset)
-
- writermock.send.assert_has_calls([ call(x) for x in pkts ])
-
- self.assertEqual(pktset, set(pkts))
- @timeout(2)
- async def test_to_loragw(self):
- readermock = AsyncMock()
- writermock = AsyncMock()
-
- pkts = [ (x, None) for x in (b'astringofdata',
- b'anotherpkt',
- b'makeupdata',
- b'asigo', )
- ] + [
- asyncio.CancelledError(),
- ]
-
- readermock.recv.side_effect = pkts
- writermock.write = Mock()
-
- txpkts = { pkts[-2][0] }
-
- with self.assertRaises(asyncio.CancelledError):
- await to_loragw(readermock, writermock, txpkts)
-
- # make sure that the ignored packet was dropped
- self.assertFalse(txpkts)
-
- # and that it wasn't transmitted
- self.assertNotIn(call(b'pkt:%s\n' %
- pkts[-2][0].hex().encode('ascii')),
- writermock.write.mock_calls)
-
- writermock.write.assert_has_calls([ call(b'pkt:%s\n' %
- x.hex().encode('ascii')) for x, addr in pkts[:-2] ])
- writermock.drain.assert_has_calls([ call() for x in pkts[:-2]
- ])
-
- @timeout(2)
- async def test_argerrors(self):
- # it'd be nice to silence the usage output here
- with self.assertRaises(SystemExit) as cm, \
- patch.dict(sys.__dict__, dict(argv=[ 'name', ])):
- await main()
-
- self.assertEqual(cm.exception.code, 2)
-
- @timeout(2)
- @patch(__name__ + '.from_loragw')
- @patch(__name__ + '.to_loragw')
- @patch('multicast.create_multicast_receiver')
- @patch('multicast.create_multicast_transmitter')
- @patch(__name__ + '.open_dev')
- async def test_main(self, od, cmt, cmr, tlg, flg):
- # setup various mocks
- cmtret = Mock()
- cmrret = Mock()
- cmt.return_value = cmtret
- cmr.return_value = cmrret
- readermock = Mock()
- writermock = Mock()
- od.return_value = (readermock, writermock)
-
- # make sure that when called w/ an arg
- serdev = 'abc123'
- with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])):
- await main()
-
- # that open_dev is called with it
- od.assert_called_with(serdev, 'w+b')
-
- # and that the multicast functions were called
- cmt.assert_called_with(DEFAULT_MADDR)
- cmr.assert_called_with(DEFAULT_MADDR)
-
- # that there was a setobj created
- setobj = tlg.mock_calls[0][1][-1]
- self.assertIsInstance(setobj, set)
-
- # and the same object was passed
- self.assertIs(setobj, flg.mock_calls[0][1][-1])
-
- # and both tasks were passed the correct objects
- tlg.assert_called_with(cmrret, writermock, setobj)
- flg.assert_called_with(readermock, cmtret, setobj)
-
- # and that they were closed in the end
- cmtret.close.assert_called()
- cmrret.close.assert_called()
-
- # and that the writer was closed as well
- writermock.close.assert_called()
|