|
- import pytest
- import aiosocks
- import aiohttp
- import asyncio
- import os
- import ssl
- import struct
- from aiohttp import web
- from aiohttp.test_utils import RawTestServer
- from aiohttp.test_utils import make_mocked_coro
- from aiosocks.test_utils import FakeSocksSrv, FakeSocks4Srv
- from aiosocks.connector import ProxyConnector, ProxyClientRequest
- from aiosocks.errors import SocksConnectionError
- from aiosocks import constants as c
- from async_timeout import timeout
- from unittest import mock
-
-
- async def test_socks4_connect_success(loop):
- pld = b'\x00\x5a\x04W\x01\x01\x01\x01test'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks4Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks4Auth('usr')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
-
- assert protocol.proxy_sockname == ('1.1.1.1', 1111)
-
- data = await protocol._stream_reader.read(4)
- assert data == b'test'
-
- transport.close()
-
-
- async def test_socks4_invalid_data(loop):
- pld = b'\x01\x5a\x04W\x01\x01\x01\x01'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks4Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks4Auth('usr')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid data' in str(ct)
-
-
- async def test_socks4_srv_error(loop):
- pld = b'\x00\x5b\x04W\x01\x01\x01\x01'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks4Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks4Auth('usr')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert '0x5b' in str(ct)
-
-
- # https://stackoverflow.com/a/55693498
- def with_timeout(t):
- def wrapper(corofunc):
- async def run(*args, **kwargs):
- with timeout(t):
- return await corofunc(*args, **kwargs)
- return run
- return wrapper
-
- async def test_socks4_datagram_failure():
- loop = asyncio.get_event_loop()
-
- async with FakeSocksSrv(loop, b'') as srv:
- addr = aiosocks.Socks4Addr('127.0.0.1', srv.port)
- with pytest.raises(ValueError):
- await aiosocks.open_datagram(addr, None, None, loop=loop)
-
- async def test_socks4_datagram_connect_failure():
- loop = asyncio.get_event_loop()
-
- async def raiseconnerr(*args, **kwargs):
- raise OSError(1)
-
- async with FakeSocksSrv(loop, b'') as srv:
- addr = aiosocks.Socks4Addr('127.0.0.1', srv.port)
- with mock.patch.object(loop, 'create_connection',
- raiseconnerr), pytest.raises(SocksConnectionError):
- await aiosocks.open_datagram(addr, None, None, loop=loop)
-
- @with_timeout(2)
- async def test_socks5_datagram_success_anonymous():
- #
- # This code is testing aiosocks.open_datagram.
- #
- # The server it is interacting with is srv (FakeSocksSrv).
- #
- # We mock the UDP Protocol to the SOCKS server w/
- # sockservdgram (FakeDGramTransport)
- #
- # UDP packet flow:
- # dgram (DGram) -> sockservdgram (FakeDGramTransport)
- # which reflects it back for delivery
- #
- loop = asyncio.get_event_loop()
- pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W'
-
- respdata = b'response data'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dname = 'python.org'
- portnum = 53
- dst = (dname, portnum)
-
- # Fake SOCKS server UDP relay
- class FakeDGramTransport(asyncio.DatagramTransport):
- def sendto(self, data, addr=None):
- # Verify correct packet was receieved
- frag, addr, payload = aiosocks.protocols.Socks5Protocol.parse_udp(data)
- assert frag == 0
- assert addr == ('python.org', 53)
- assert payload == b'some data'
-
- # Send frag reply, make sure it's ignored
- ba = bytearray()
- ba.extend([ 0, 0, 1, 1, 2, 2, 2, 2, ])
- ba += (53).to_bytes(2, 'big')
- ba += respdata
- dgram.datagram_received(ba, ('3.3.3.3', 0))
-
- # Send reply
- # wish I could use build_udp here, but it's async
- ba = bytearray()
- ba.extend([ 0, 0, 0, 1, 2, 2, 2, 2, ])
- ba += (53).to_bytes(2, 'big')
- ba += respdata
- dgram.datagram_received(ba, ('3.3.3.3', 0))
-
- sockservdgram = FakeDGramTransport()
-
- # Fake the creation of the UDP relay
- async def fake_cde(factory, remote_addr):
- assert remote_addr == ('1.1.1.1', 1111)
-
- proto = factory()
-
- proto.connection_made(sockservdgram)
-
- return sockservdgram, proto
-
- # Open the UDP connection
- with mock.patch.object(loop, 'create_datagram_endpoint',
- fake_cde) as m:
- dgram = await aiosocks.open_datagram(addr, None, dst, loop=loop)
-
- rdr = srv.get_reader()
- # make sure we negotiated the correct command
- assert (await rdr.readexactly(5))[4] == c.SOCKS_CMD_UDP_ASSOCIATE
-
- assert dgram.proxy_sockname == ('1.1.1.1', 1111)
-
- dgram.send(b'some data')
- # XXX -- assert from fakesockssrv
-
- assert await dgram == (respdata, ('2.2.2.2', 53))
-
- dgram.close()
-
-
- async def test_socks5_connect_success_anonymous(loop):
- pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
-
- assert protocol.proxy_sockname == ('1.1.1.1', 1111)
-
- data = await protocol._stream_reader.read(4)
- assert data == b'test'
-
- transport.close()
-
-
- async def test_socks5_connect_success_usr_pwd(loop):
- pld = b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert protocol.proxy_sockname == ('1.1.1.1', 1111)
-
- data = await protocol._stream_reader.read(4)
- assert data == b'test'
- transport.close()
-
-
- async def test_socks5_auth_ver_err(loop):
- async with FakeSocksSrv(loop, b'\x04\x02') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid version' in str(ct)
-
-
- async def test_socks5_auth_method_rejected(loop):
- async with FakeSocksSrv(loop, b'\x05\xFF') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'authentication methods were rejected' in str(ct)
-
-
- async def test_socks5_auth_status_invalid(loop):
- async with FakeSocksSrv(loop, b'\x05\xF0') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid data' in str(ct)
-
-
- async def test_socks5_auth_status_invalid2(loop):
- async with FakeSocksSrv(loop, b'\x05\x02\x02\x00') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid data' in str(ct)
-
-
- async def test_socks5_auth_failed(loop):
- async with FakeSocksSrv(loop, b'\x05\x02\x01\x01') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'authentication failed' in str(ct)
-
-
- async def test_socks5_cmd_ver_err(loop):
- async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x04\x00\x00') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid version' in str(ct)
-
-
- async def test_socks5_cmd_not_granted(loop):
- async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x05\x01\x00') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'General SOCKS server failure' in str(ct)
-
-
- async def test_socks5_invalid_address_type(loop):
- async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x05\x00\x00\xFF') as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- with pytest.raises(aiosocks.SocksError) as ct:
- await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert 'invalid data' in str(ct)
-
-
- async def test_socks5_atype_ipv4(loop):
- pld = b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert protocol.proxy_sockname == ('1.1.1.1', 1111)
-
- transport.close()
-
-
- async def test_socks5_atype_ipv6(loop):
- pld = b'\x05\x02\x01\x00\x05\x00\x00\x04\x00\x00\x00\x00' \
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x11\x04W'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert protocol.proxy_sockname == ('::111', 1111) or \
- protocol.proxy_sockname == ('::0.0.1.17', 1111)
-
- transport.close()
-
-
- async def test_socks5_atype_domain(loop):
- pld = b'\x05\x02\x01\x00\x05\x00\x00\x03\x0apython.org\x04W'
-
- async with FakeSocksSrv(loop, pld) as srv:
- addr = aiosocks.Socks5Addr('127.0.0.1', srv.port)
- auth = aiosocks.Socks5Auth('usr', 'pwd')
- dst = ('python.org', 80)
-
- transport, protocol = await aiosocks.create_connection(
- None, addr, auth, dst, loop=loop)
- assert protocol.proxy_sockname == (b'python.org', 1111)
-
- transport.close()
-
-
- async def test_http_connect(loop):
- async def handler(request):
- return web.Response(text='Test message')
-
- async with RawTestServer(handler, host='127.0.0.1', loop=loop) as ws:
- async with FakeSocks4Srv(loop) as srv:
- conn = ProxyConnector(loop=loop, remote_resolve=False)
-
- async with aiohttp.ClientSession(
- connector=conn, loop=loop,
- request_class=ProxyClientRequest) as ses:
- proxy = 'socks4://127.0.0.1:{}'.format(srv.port)
-
- async with ses.get(ws.make_url('/'), proxy=proxy) as resp:
- assert resp.status == 200
- assert (await resp.text()) == 'Test message'
-
-
- async def test_https_connect(loop):
- async def handler(request):
- return web.Response(text='Test message')
-
- here = os.path.join(os.path.dirname(__file__), '..', 'tests')
- keyfile = os.path.join(here, 'sample.key')
- certfile = os.path.join(here, 'sample.crt')
- sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext.load_cert_chain(certfile, keyfile)
-
- ws = RawTestServer(handler, scheme='https', host='127.0.0.1', loop=loop)
- await ws.start_server(loop=loop, ssl=sslcontext)
-
- v_fp = (b'0\x9a\xc9D\x83\xdc\x91\'\x88\x91\x11\xa1d\x97\xfd'
- b'\xcb~7U\x14D@L'
- b'\x11\xab\x99\xa8\xae\xb7\x14\xee\x8b')
- inv_fp = (b'0\x9d\xc9D\x83\xdc\x91\'\x88\x91\x11\xa1d\x97\xfd'
- b'\xcb~7U\x14D@L'
- b'\x11\xab\x99\xa8\xae\xb7\x14\xee\x9e')
-
- async with FakeSocks4Srv(loop) as srv:
- v_conn = ProxyConnector(loop=loop, remote_resolve=False,
- verify_ssl=False, fingerprint=v_fp)
- inv_conn = ProxyConnector(loop=loop, remote_resolve=False,
- verify_ssl=False, fingerprint=inv_fp)
-
- async with aiohttp.ClientSession(
- connector=v_conn, loop=loop,
- request_class=ProxyClientRequest) as ses:
- proxy = 'socks4://127.0.0.1:{}'.format(srv.port)
-
- async with ses.get(ws.make_url('/'), proxy=proxy) as resp:
- assert resp.status == 200
- assert (await resp.text()) == 'Test message'
-
- async with aiohttp.ClientSession(
- connector=inv_conn, loop=loop,
- request_class=ProxyClientRequest) as ses:
- proxy = 'socks4://127.0.0.1:{}'.format(srv.port)
-
- with pytest.raises(aiohttp.ServerFingerprintMismatch):
- async with ses.get(ws.make_url('/'), proxy=proxy) as resp:
- assert resp.status == 200
|