|
|
@@ -562,10 +562,97 @@ class TestLORANode(unittest.IsolatedAsyncioTestCase): |
|
|
|
self.assertTrue(tsd.recvq.empty()) |
|
|
|
#_debprint('done') |
|
|
|
|
|
|
|
@timeout(2) |
|
|
|
async def test_ccode_badmsgs(self): |
|
|
|
# Test to make sure that various bad messages in the |
|
|
|
# handshake process are rejected even if the attacker |
|
|
|
# has the correct key. This just keeps the protocol |
|
|
|
# tight allowing for variations in the future. |
|
|
|
|
|
|
|
# seed the RNG |
|
|
|
prngseed = b'abc123' |
|
|
|
from ctypes import c_uint8 |
|
|
|
lora_comms.strobe_seed_prng((c_uint8 * |
|
|
|
len(prngseed))(*prngseed), len(prngseed)) |
|
|
|
|
|
|
|
# Create the state for testing |
|
|
|
commstate = lora_comms.CommsState() |
|
|
|
|
|
|
|
# dummy callback |
|
|
|
def procmsg(msg, outbuf): |
|
|
|
pass |
|
|
|
|
|
|
|
cb = lora_comms.process_msgfunc_t(procmsg) |
|
|
|
|
|
|
|
# Generate shared key |
|
|
|
shared_key = os.urandom(32) |
|
|
|
|
|
|
|
# Initialize everything |
|
|
|
lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key)) |
|
|
|
|
|
|
|
# Create test fixture, only use it to init crypto state |
|
|
|
tsd = SyncDatagram() |
|
|
|
l = LORANode(tsd, shared=shared_key) |
|
|
|
|
|
|
|
# copy the crypto state |
|
|
|
cstate = l.st.copy() |
|
|
|
|
|
|
|
# compose an incorrect init message |
|
|
|
msg = os.urandom(16) + b'othre' |
|
|
|
msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN) |
|
|
|
|
|
|
|
out = lora_comms.comms_process_wrap(commstate, msg) |
|
|
|
|
|
|
|
self.assertFalse(out) |
|
|
|
|
|
|
|
# copy the crypto state |
|
|
|
cstate = l.st.copy() |
|
|
|
|
|
|
|
# compose an incorrect init message |
|
|
|
msg = os.urandom(16) + b' eqreset' |
|
|
|
msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN) |
|
|
|
|
|
|
|
out = lora_comms.comms_process_wrap(commstate, msg) |
|
|
|
|
|
|
|
self.assertFalse(out) |
|
|
|
|
|
|
|
# compose the correct init message |
|
|
|
msg = os.urandom(16) + b'reqreset' |
|
|
|
msg = l.st.send_enc(msg) + l.st.send_mac(l.MAC_LEN) |
|
|
|
|
|
|
|
out = lora_comms.comms_process_wrap(commstate, msg) |
|
|
|
|
|
|
|
l.st.recv_enc(out[:-l.MAC_LEN]) |
|
|
|
l.st.recv_mac(out[-l.MAC_LEN:]) |
|
|
|
|
|
|
|
l.st.ratchet() |
|
|
|
|
|
|
|
# copy the crypto state |
|
|
|
cstate = l.st.copy() |
|
|
|
|
|
|
|
# compose an incorrect confirmed message |
|
|
|
msg = b'onfirm' |
|
|
|
msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN) |
|
|
|
|
|
|
|
out = lora_comms.comms_process_wrap(commstate, msg) |
|
|
|
|
|
|
|
self.assertFalse(out) |
|
|
|
|
|
|
|
# copy the crypto state |
|
|
|
cstate = l.st.copy() |
|
|
|
|
|
|
|
# compose an incorrect confirmed message |
|
|
|
msg = b' onfirm' |
|
|
|
msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN) |
|
|
|
|
|
|
|
out = lora_comms.comms_process_wrap(commstate, msg) |
|
|
|
|
|
|
|
self.assertFalse(out) |
|
|
|
|
|
|
|
@timeout(2) |
|
|
|
async def test_ccode(self): |
|
|
|
_self = self |
|
|
|
from ctypes import pointer, sizeof, c_uint8 |
|
|
|
from ctypes import c_uint8 |
|
|
|
|
|
|
|
# seed the RNG |
|
|
|
prngseed = b'abc123' |
|
|
@@ -675,7 +762,7 @@ class TestLORANode(unittest.IsolatedAsyncioTestCase): |
|
|
|
''' |
|
|
|
|
|
|
|
_self = self |
|
|
|
from ctypes import pointer, sizeof, c_uint8 |
|
|
|
from ctypes import c_uint8 |
|
|
|
|
|
|
|
seq = AsyncSequence() |
|
|
|
|
|
|
|