- from noise.connection import NoiseConnection, Keypair
- from twistednoise import genkeypair
- import os.path
- import shutil
- import socket
- import tempfile
- import threading
- import unittest
- def _makeunix(path):
- '''Make a properly formed unix path socket string.'''
- return 'unix:%s' % path
- def _acceptfun(s, fun):
- while True:
- sock = s.accept()
- fun(*sock)
- def listensocket(sockstr, fun):
- '''Listen for connections on sockstr. When ever a connection
- is accepted, the parameter fun is called with the socket and
- the from address. The return will be a Thread object. Note
- that fun MUST NOT block, as if it does, it will stop accepting
- other connections.
- The format of sockstr is: 'proto:param=value[,param2=value2]'.
- If the proto has a default parameter, the value can be used
- directly, like: 'proto:value'. This is only allowed when the
- value can unambiguously be determined not to be a param.
- The characters that define 'param' must be all lower case ascii
- characters and may contain an underscore. The first character
- must not be and underscore.
- Supported protocols:
- unix:
- Default parameter is path.
- The path parameter specifies the path to the
- unix domain socket. The path MUST start w/ a
- slash if it is used as a default parameter.
- '''
- proto, rem = sockstr.split(':', 1)
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.bind(rem)
- s.listen(-1)
- thr = threading.Thread(target=_acceptfun, name='accept thread: %s' % repr(sockstr), args=(s, fun))
- thr.setDaemon(True)
- thr.start()
- return thr
- class NoiseForwarder(object):
- def __init__(self, mode, sock, ):
- nf = NoiseForwarder('resp', self.server_key_pair[1], ssock, pttarg)
- pass
- class TestListenSocket(unittest.TestCase):
- def test_listensocket(self):
- # XXX write test
- pass
- class Tests(unittest.TestCase):
- def setUp(self):
- # setup temporary directory
- d = os.path.realpath(tempfile.mkdtemp())
- self.basetempdir = d
- self.tempdir = os.path.join(d, 'subdir')
- os.mkdir(self.tempdir)
- # Generate key pairs
- self.server_key_pair = genkeypair()
- self.client_key_pair = genkeypair()
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
- def test_server(self):
- # Path that the server will sit on
- servsockpath = os.path.join(self.tempdir, 'servsock')
- servarg = _makeunix(servsockpath)
- # Path that the server will send pt data to
- servsockpath = os.path.join(self.tempdir, 'servptsock')
- # Setup pt target listener
- pttarg = _makeunix(servsockpath)
- ptsock = []
- def ptsockaccept(sock, frm, ptsock=ptsock):
- ptsock.append(sock)
- # Bind to pt listener
- lsock = listensocket(pttarg, ptsockaccept)
- # Setup server listener
- ssock = listensocket(servarg, lambda x, y: NoiseForwarder('resp', self.server_key_pair[1], x, pttarg))
- # Create client
- proto = NoiseConnection.from_name(b'Noise_XK_448_ChaChaPoly_SHA256')
- proto.set_as_initiator()
- # Setup required keys
- proto.set_keypair_from_private_bytes(Keypair.STATIC, self.client_key_pair[1])
- proto.set_keypair_from_public_bytes(Keypair.REMOTE_STATIC, self.server_key_pair[0])
- proto.start_handshake()
- # Send first message
- message = proto.write_message()