From 7f60466d353ad753d277c633af5d90396255d40a Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Fri, 25 Oct 2019 00:59:26 -0700 Subject: [PATCH] get the responder side of things working, including COMPLETE! shutdown... not fully tested on the various errors... --- ntunnel.py | 70 +++++++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/ntunnel.py b/ntunnel.py index c9de14c..178039a 100644 --- a/ntunnel.py +++ b/ntunnel.py @@ -131,27 +131,24 @@ async def NoiseForwarder(mode, priv_key, rdrwrr, ptsockstr): writer.write(proto.decrypt(tmsg)) await writer.drain() finally: - print('foo') - # XXX - how to test - #writer.write_eof() + writer.write_eof() async def encses(): - while True: - ptmsg = await reader.read(65535 - 16) # largest message - encmsg = proto.encrypt(ptmsg) - wrr.write(enclenfun(encmsg)) - wrr.write(encmsg) - await wrr.drain() - - done, pending = await asyncio.wait((decses(), encses()), return_when=asyncio.FIRST_COMPLETED) - for i in done: - print('v:', repr(await i)) - - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - for i in done: - print('v:', repr(await i)) + try: + while True: + ptmsg = await reader.read(65535 - 16) # largest message + if not ptmsg: + # eof + return 'enc' + + encmsg = proto.encrypt(ptmsg) + wrr.write(enclenfun(encmsg)) + wrr.write(encmsg) + await wrr.drain() + finally: + wrr.write_eof() - return done + return asyncio.gather(decses(), encses()) class TestListenSocket(unittest.TestCase): def test_listensockstr(self): @@ -201,15 +198,19 @@ class Tests(unittest.TestCase): @async_test async def test_server(self): + # Test is plumbed: + # (reader, writer) -> servsock -> + # (rdr, wrr) NoiseForward (reader, writer) -> + # servptsock -> (ptsock[0], ptsock[1]) # Path that the server will sit on servsockpath = os.path.join(self.tempdir, 'servsock') servarg = _makeunix(servsockpath) # Path that the server will send pt data to - servsockpath = os.path.join(self.tempdir, 'servptsock') + servptpath = os.path.join(self.tempdir, 'servptsock') # Setup pt target listener - pttarg = _makeunix(servsockpath) + pttarg = _makeunix(servptpath) ptsock = [] def ptsockaccept(reader, writer, ptsock=ptsock): ptsock.append((reader, writer)) @@ -221,14 +222,10 @@ class Tests(unittest.TestCase): event = asyncio.Event() async def runnf(rdr, wrr): - print('a') a = await NoiseForwarder('resp', self.server_key_pair[1], (rdr, wrr), pttarg) - print('b') nfs.append(a) - print('c') event.set() - print('d') # Setup server listener ssock = await listensockstr(servarg, runnf) @@ -275,8 +272,9 @@ class Tests(unittest.TestCase): # XXX - how to sync? await asyncio.sleep(.1) + ptreader, ptwriter = ptsock[0] # read the test message - rptmsg = await ptsock[0][0].readexactly(len(ptmsg)) + rptmsg = await ptreader.readexactly(len(ptmsg)) self.assertEqual(rptmsg, ptmsg) @@ -290,13 +288,13 @@ class Tests(unittest.TestCase): await asyncio.sleep(.1) # read the test message - rptmsg = await ptsock[0][0].readexactly(len(ptmsg)) + rptmsg = await ptreader.readexactly(len(ptmsg)) self.assertEqual(rptmsg, ptmsg) # now try the other way ptmsg = os.urandom(912) - ptsock[0][1].write(ptmsg) + ptwriter.write(ptmsg) # find out how much we need to read encmsg = await reader.readexactly(2 + 16) @@ -309,12 +307,20 @@ class Tests(unittest.TestCase): self.assertEqual(rptmsg, ptmsg) - # shut everything down + # shut down sending writer.write_eof() - #ptsock[0][1].write_eof() - # XXX - how to sync? - await asyncio.sleep(.1) + # so pt reader should be shut down + r = await ptreader.read(1) + self.assertTrue(ptreader.at_eof()) + + # shut down pt + ptwriter.write_eof() + + # make sure the enc reader is eof + r = await reader.read(1) + self.assertTrue(reader.at_eof()) await event.wait() - print(repr(nfs)) + + self.assertEqual(await nfs[0], [ 'dec', 'enc' ])