commit 005d1f48fa04e7989789f989874dc77ad482e993 Author: John-Mark Gurney Date: Mon Oct 21 00:43:22 2019 -0700 first code that implements a simple echo server for noise in twisted diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..48184de --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.coverage +p +__pycache__ +_trial_temp diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..44b7237 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +VIRTUALENV ?= virtualenv + +MODULES=twistednoise.py + +test: + (echo $(MODULES) | entr sh -c 'python -m coverage run -m twisted.trial $(basename $(MODULES)) && coverage report --omit=p/\* -m -i') + +env: + ($(VIRTUALENV) p && . ./p/bin/activate && pip install -r requirements.txt) diff --git a/NOTES.md b/NOTES.md new file mode 100644 index 0000000..e977b06 --- /dev/null +++ b/NOTES.md @@ -0,0 +1 @@ +[TDD with Twisted](https://twistedmatrix.com/documents/current/core/howto/trial.html) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..41e7216 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +coverage +-e git+https://github.com/jmgurney/noiseprotocol.git@ab6f8ebe0e28f5a4105928c13baddcfdc43b7e82#egg=noiseprotocol +twisted diff --git a/twistednoise.py b/twistednoise.py new file mode 100644 index 0000000..3ca9c73 --- /dev/null +++ b/twistednoise.py @@ -0,0 +1,111 @@ +from twisted.trial import unittest +from twisted.test import proto_helpers +from noise.connection import NoiseConnection, Keypair +from twisted.internet.protocol import Factory +# XXX - shouldn't need to access the underlying primitives, but that's what +# noiseprotocol module requires. +from cryptography.hazmat.primitives.asymmetric import x448 +from cryptography.hazmat.primitives import serialization + +# Notes: +# Using XK, so that the connecting party's identity is hidden and that the +# server's party's key is known. + +import twisted.internet.protocol + +def genkeypair(): + '''Generates a keypair, and returns a tuple of (public, private). + They are encoded as raw bytes, and sutible for use w/ Noise.''' + + key = x448.X448PrivateKey.generate() + + enc = serialization.Encoding.Raw + pubformat = serialization.PublicFormat.Raw + privformat = serialization.PrivateFormat.Raw + encalgo = serialization.NoEncryption() + + pub = key.public_key().public_bytes(encoding=enc, format=pubformat) + priv = key.private_bytes(encoding=enc, format=privformat, encryption_algorithm=encalgo) + + return pub, priv + +class TwistedNoiseServerProtocol(twisted.internet.protocol.Protocol): + def connectionMade(self): + # Initialize Noise + noise = NoiseConnection.from_name(b'Noise_XK_448_ChaChaPoly_SHA256') + self.noise = noise + noise.set_as_responder() + noise.set_keypair_from_private_bytes(Keypair.STATIC, self.factory.server_key) + + # Start Handshake + noise.start_handshake() + + def dataReceived(self, data): + if not self.noise.handshake_finished: + self.noise.read_message(data) + if not self.noise.handshake_finished: + self.transport.write(self.noise.write_message()) + else: + r = self.noise.decrypt(data) + # echo it + self.transport.write(self.noise.encrypt(r)) + +class TwistedNoiseServerFactory(Factory): + protocol = TwistedNoiseServerProtocol + + def __init__(self, server_key): + self.server_key = server_key + +class TNServerTest(unittest.TestCase): + def setUp(self): + self.server_key_pair = genkeypair() + + factory = TwistedNoiseServerFactory(server_key=self.server_key_pair[1]) + self.proto = factory.buildProtocol(('127.0.0.1', 0)) + self.tr = proto_helpers.StringTransport() + self.proto.makeConnection(self.tr) + + self.client_key_pair = genkeypair() + + def test_testprotocol(self): + # Create client + proto = NoiseConnection.from_name(b'Noise_XK_448_ChaChaPoly_SHA256') + proto.set_as_initiator() + + # Setup required keys + proto.set_keypair_from_private_bytes(Keypair.STATIC, self.client_key_pair[1]) + proto.set_keypair_from_public_bytes(Keypair.REMOTE_STATIC, self.server_key_pair[0]) + + proto.set_keypair_from_private_bytes(Keypair.STATIC, self.client_key_pair[1]) + proto.start_handshake() + + # Send first message + message = proto.write_message() + self.proto.dataReceived(message) + + # Get response + resp = self.tr.value() + self.tr.clear() + + # And process it + proto.read_message(resp) + + # Send second message + message = proto.write_message() + self.proto.dataReceived(message) + + # Finish handshake + self.assertTrue(proto.handshake_finished) + + ptmsg = b'this is a test message' + encmsg = proto.encrypt(ptmsg) + + self.proto.dataReceived(encmsg) + + # Get echo + resp = self.tr.value() + self.tr.clear() + + ptresp = proto.decrypt(resp) + + self.assertEqual(ptresp, ptmsg)