|
- from ctypes import Structure, POINTER, CFUNCTYPE, pointer
- from ctypes import c_uint8, c_uint16, c_ssize_t, c_size_t, c_uint64, c_int
- from ctypes import CDLL
-
- class PktBuf(Structure):
- _fields_ = [
- ('pkt', POINTER(c_uint8)),
- ('pktlen', c_uint16),
- ]
-
- def _from(self):
- return bytes(self.pkt[:self.pktlen])
-
- def __repr__(self): #pragma: no cover
- return 'PktBuf(pkt=%s, pktlen=%s)' % (repr(self._from()),
- self.pktlen)
-
- def make_pktbuf(s):
- pb = PktBuf()
-
- if isinstance(s, bytearray):
- obj = s
- pb.pkt = pointer(c_uint8.from_buffer(s))
- else:
- obj = (c_uint8 * len(s))(*s)
- pb.pkt = obj
-
- pb.pktlen = len(s)
-
- pb._make_pktbuf_ref = (obj, s)
-
- return pb
-
- process_msgfunc_t = CFUNCTYPE(None, PktBuf, POINTER(PktBuf))
-
- _lib = CDLL('liblora_test.dylib')
-
- _lib._strobe_state_size.restype = c_size_t
- _lib._strobe_state_size.argtypes = ()
- _strobe_state_u64_cnt = (_lib._strobe_state_size() + 7) // 8
-
- class CommsState(Structure):
- _fields_ = [
- # The alignment of these may be off
- ('cs_state', c_uint64 * _strobe_state_u64_cnt),
- ('cs_comm_state', c_int),
- ('cs_start', c_uint64 * _strobe_state_u64_cnt),
- ('cs_procmsg', process_msgfunc_t),
-
- ('cs_prevmsg', PktBuf),
- ('cs_prevmsgresp', PktBuf),
-
- ('cs_prevmsgbuf', c_uint8 * 64),
- ('cs_prevmsgrespbuf', c_uint8 * 64),
- ]
-
- for func, ret, args in [
- ('comms_init', None, (POINTER(CommsState), process_msgfunc_t, POINTER(PktBuf))),
- ('comms_process', None, (POINTER(CommsState), PktBuf, POINTER(PktBuf))),
- ('strobe_seed_prng', None, (POINTER(c_uint8), c_ssize_t)),
- ]:
- f = getattr(_lib, func)
- f.restype = ret
- f.argtypes = args
- locals()[func] = f
|