|
- # Copyright 2021 John-Mark Gurney.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # 1. Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- # SUCH DAMAGE.
- #
-
- from ctypes import Structure, POINTER, CFUNCTYPE, pointer, sizeof
- from ctypes import c_uint8, c_uint16, c_ssize_t, c_size_t, c_uint64, c_int
- from ctypes import CDLL
-
- class StructureRepr(object):
- def __repr__(self): #pragma: no cover
- return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%s' %
- (k, getattr(self, k)) for k, v in self._fields_))
-
- class PktBuf(Structure):
- _fields_ = [
- ('pkt', POINTER(c_uint8)),
- ('pktlen', c_uint16),
- ]
-
- def _from(self):
- return bytes(self.pkt[:self.pktlen])
-
- def __repr__(self): #pragma: no cover
- return 'PktBuf(pkt=%s, pktlen=%s)' % (repr(self._from()),
- self.pktlen)
-
- def make_pktbuf(s):
- pb = PktBuf()
-
- if isinstance(s, bytearray):
- obj = s
- pb.pkt = pointer(c_uint8.from_buffer(s))
- else:
- obj = (c_uint8 * len(s))(*s)
- pb.pkt = obj
-
- pb.pktlen = len(s)
-
- pb._make_pktbuf_ref = (obj, s)
-
- return pb
-
- process_msgfunc_t = CFUNCTYPE(None, PktBuf, POINTER(PktBuf))
-
- try:
- _lib = CDLL('liblora_test.dylib')
- except OSError:
- _lib = None
-
- if _lib is not None:
- _lib._strobe_state_size.restype = c_size_t
- _lib._strobe_state_size.argtypes = ()
- _strobe_state_u64_cnt = (_lib._strobe_state_size() + 7) // 8
- else:
- _strobe_state_u64_cnt = 1
-
- class CommsSession(Structure,StructureRepr):
- _fields_ = [
- ('cs_crypto', c_uint64 * _strobe_state_u64_cnt),
- ('cs_state', c_int),
- ]
-
- class CommsState(Structure,StructureRepr):
- _fields_ = [
- # The alignment of these may be off
- ('cs_active', CommsSession),
- ('cs_pending', CommsSession),
- ('cs_start', c_uint64 * _strobe_state_u64_cnt),
- ('cs_procmsg', process_msgfunc_t),
-
- ('cs_prevmsg', PktBuf),
- ('cs_prevmsgresp', PktBuf),
-
- ('cs_prevmsgbuf', c_uint8 * 64),
- ('cs_prevmsgrespbuf', c_uint8 * 64),
- ]
-
- if _lib is not None:
- _lib._comms_state_size.restype = c_size_t
- _lib._comms_state_size.argtypes = ()
-
- if _lib._comms_state_size() != sizeof(CommsState): # pragma: no cover
- raise RuntimeError('CommsState structure size mismatch!')
-
- for func, ret, args in [
- ('comms_init', None, (POINTER(CommsState), process_msgfunc_t,
- POINTER(PktBuf))),
- ('comms_process', None, (POINTER(CommsState), PktBuf, POINTER(PktBuf))),
- ('strobe_seed_prng', None, (POINTER(c_uint8), c_ssize_t)),
- ]:
- f = getattr(_lib, func)
- f.restype = ret
- f.argtypes = args
- locals()[func] = f
-
- def comms_process_wrap(state, input):
- '''A wrapper around comms_process that converts the argument
- into the buffer, and the returns the message as a bytes string.
- '''
-
- inpkt = make_pktbuf(input)
-
- outbytes = bytearray(64)
- outbuf = make_pktbuf(outbytes)
-
- comms_process(state, inpkt, outbuf)
-
- return outbuf._from()
|