|
@@ -1,16 +1,13 @@ |
|
|
import unittest |
|
|
import unittest |
|
|
import aiohttp |
|
|
|
|
|
import aiosocks |
|
|
import aiosocks |
|
|
import asyncio |
|
|
import asyncio |
|
|
from aiohttp import web |
|
|
|
|
|
from aiosocks.connector import SocksConnector |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
try: |
|
|
from asyncio import ensure_future |
|
|
from asyncio import ensure_future |
|
|
except ImportError: |
|
|
except ImportError: |
|
|
ensure_future = asyncio.async |
|
|
ensure_future = asyncio.async |
|
|
|
|
|
|
|
|
from .helpers import fake_socks_srv, find_unused_port |
|
|
|
|
|
|
|
|
from .helpers import fake_socks_srv |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCreateSocks4Connection(unittest.TestCase): |
|
|
class TestCreateSocks4Connection(unittest.TestCase): |
|
@@ -22,59 +19,49 @@ class TestCreateSocks4Connection(unittest.TestCase): |
|
|
self.loop.close() |
|
|
self.loop.close() |
|
|
|
|
|
|
|
|
def test_connect_success(self): |
|
|
def test_connect_success(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x00\x5a\x04W\x01\x01\x01\x01test') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x00\x5a\x04W\x01\x01\x01\x01test') as port: |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete( |
|
|
|
|
|
protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
|
|
|
def test_invalid_data(self): |
|
|
def test_invalid_data(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x01\x5a\x04W\x01\x01\x01\x01') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x01\x5a\x04W\x01\x01\x01\x01') as port: |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_socks_srv_error(self): |
|
|
def test_socks_srv_error(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x00\x5b\x04W\x01\x01\x01\x01') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('0x5b', str(ct.exception)) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x00\x5b\x04W\x01\x01\x01\x01') as port: |
|
|
|
|
|
addr = aiosocks.Socks4Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks4Auth('usr') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('0x5b', str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCreateSocks5Connect(unittest.TestCase): |
|
|
class TestCreateSocks5Connect(unittest.TestCase): |
|
@@ -86,250 +73,193 @@ class TestCreateSocks5Connect(unittest.TestCase): |
|
|
self.loop.close() |
|
|
self.loop.close() |
|
|
|
|
|
|
|
|
def test_connect_success_anonymous(self): |
|
|
def test_connect_success_anonymous(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv( |
|
|
|
|
|
|
|
|
with fake_socks_srv( |
|
|
self.loop, |
|
|
self.loop, |
|
|
b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' |
|
|
|
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete( |
|
|
|
|
|
protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
|
|
|
def test_connect_success_usr_pwd(self): |
|
|
def test_connect_success_usr_pwd(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv( |
|
|
|
|
|
|
|
|
with fake_socks_srv( |
|
|
self.loop, |
|
|
self.loop, |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' |
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
) as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete( |
|
|
|
|
|
protocol._stream_reader.read(4)) |
|
|
|
|
|
self.assertEqual(data, b'test') |
|
|
|
|
|
transport.close() |
|
|
|
|
|
|
|
|
def test_auth_ver_err(self): |
|
|
def test_auth_ver_err(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x04\x02') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid version', str(ct.exception)) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, b'\x04\x02') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid version', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_auth_method_rejected(self): |
|
|
def test_auth_method_rejected(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\xFF') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('authentication methods were rejected', |
|
|
|
|
|
str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, b'\x05\xFF') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('authentication methods were rejected', |
|
|
|
|
|
str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_auth_status_invalid(self): |
|
|
def test_auth_status_invalid(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\xF0') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, b'\x05\xF0') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_auth_status_invalid2(self): |
|
|
def test_auth_status_invalid2(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\x02\x02\x00') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, b'\x05\x02\x02\x00') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_auth_failed(self): |
|
|
def test_auth_failed(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\x02\x01\x01') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('authentication failed', str(ct.exception)) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, b'\x05\x02\x01\x01') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('authentication failed', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_cmd_ver_err(self): |
|
|
def test_cmd_ver_err(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\x02\x01\x00\x04\x00\x00') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid version', str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x05\x02\x01\x00\x04\x00\x00') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid version', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_cmd_not_granted(self): |
|
|
def test_cmd_not_granted(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\x02\x01\x00\x05\x01\x00') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('General SOCKS server failure', str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x05\x02\x01\x00\x05\x01\x00') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('General SOCKS server failure', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_invalid_address_type(self): |
|
|
def test_invalid_address_type(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv(self.loop, b'\x05\x02\x01\x00\x05\x00\x00\xFF') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
with fake_socks_srv(self.loop, |
|
|
|
|
|
b'\x05\x02\x01\x00\x05\x00\x00\xFF') as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(aiosocks.SocksError) as ct: |
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
self.loop.run_until_complete(coro) |
|
|
|
|
|
self.assertIn('invalid data', str(ct.exception)) |
|
|
|
|
|
|
|
|
def test_atype_ipv4(self): |
|
|
def test_atype_ipv4(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv( |
|
|
|
|
|
|
|
|
with fake_socks_srv( |
|
|
self.loop, |
|
|
self.loop, |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' |
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
) as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('1.1.1.1', 1111)) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
|
|
|
def test_atype_ipv6(self): |
|
|
def test_atype_ipv6(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv( |
|
|
|
|
|
|
|
|
with fake_socks_srv( |
|
|
self.loop, |
|
|
self.loop, |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x04\x00\x00\x00\x00' |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x04\x00\x00\x00\x00' |
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x11\x04W') |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x11\x04W' |
|
|
|
|
|
) as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('::111', 1111)) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, ('::111', 1111)) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
|
|
|
def test_atype_domain(self): |
|
|
def test_atype_domain(self): |
|
|
server, port = self.loop.run_until_complete( |
|
|
|
|
|
fake_socks_srv( |
|
|
|
|
|
|
|
|
with fake_socks_srv( |
|
|
self.loop, |
|
|
self.loop, |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x03\x0apython.org\x04W' |
|
|
b'\x05\x02\x01\x00\x05\x00\x00\x03\x0apython.org\x04W' |
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
) as port: |
|
|
|
|
|
addr = aiosocks.Socks5Addr('127.0.0.1', port) |
|
|
|
|
|
auth = aiosocks.Socks5Auth('usr', 'pwd') |
|
|
|
|
|
dst = ('python.org', 80) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, (b'python.org', 1111)) |
|
|
|
|
|
|
|
|
coro = aiosocks.create_connection( |
|
|
|
|
|
None, addr, auth, dst, loop=self.loop) |
|
|
|
|
|
transport, protocol = self.loop.run_until_complete(coro) |
|
|
|
|
|
|
|
|
transport.close() |
|
|
|
|
|
server.close() |
|
|
|
|
|
self.loop.run_until_complete(server.wait_closed()) |
|
|
|
|
|
|
|
|
self.assertEqual(protocol.proxy_sockname, (b'python.org', 1111)) |
|
|
|
|
|
|
|
|
|
|
|
transport.close() |