|
|
@@ -38,9 +38,9 @@ def make_socks4(loop, *, addr=None, auth=None, rr=True, dst=None, r=b'', |
|
|
|
|
|
|
|
|
|
|
|
def make_socks5(loop, *, addr=None, auth=None, rr=True, dst=None, r=None, |
|
|
|
ap_factory=None, whiter=None): |
|
|
|
ap_factory=None, whiter=None, user='user', pwd='pwd'): |
|
|
|
addr = addr or aiosocks.Socks5Addr('localhost', 1080) |
|
|
|
auth = auth or aiosocks.Socks5Auth('user', 'pwd') |
|
|
|
auth = auth or aiosocks.Socks5Auth(user, pwd) |
|
|
|
dst = dst or ('python.org', 80) |
|
|
|
|
|
|
|
proto = aiosocks.Socks5Protocol( |
|
|
@@ -497,6 +497,26 @@ async def test_socks5_auth_usr_pwd_granted(loop): |
|
|
|
]) |
|
|
|
|
|
|
|
|
|
|
|
async def test_socks5_auth_usr_pwd_granted_long_usr(loop): |
|
|
|
proto = make_socks5(loop, r=(b'\x05\x02', b'\x01\x00',), user='0'*200) |
|
|
|
await proto.authenticate() |
|
|
|
|
|
|
|
proto._stream_writer.write.assert_has_calls([ |
|
|
|
mock.call(b'\x05\x02\x00\x02'), |
|
|
|
mock.call(b'\x01\xc8' + b'0' * 200 + b'\x03pwd') |
|
|
|
]) |
|
|
|
|
|
|
|
|
|
|
|
async def test_socks5_auth_usr_pwd_granted_long_pwd(loop): |
|
|
|
proto = make_socks5(loop, r=(b'\x05\x02', b'\x01\x00',), pwd='0'*200) |
|
|
|
await proto.authenticate() |
|
|
|
|
|
|
|
proto._stream_writer.write.assert_has_calls([ |
|
|
|
mock.call(b'\x05\x02\x00\x02'), |
|
|
|
mock.call(b'\x01\x04user\xc8' + b'0' * 200) |
|
|
|
]) |
|
|
|
|
|
|
|
|
|
|
|
async def test_socks5_auth_invalid_reply(loop): |
|
|
|
proto = make_socks5(loop, r=(b'\x05\x02', b'\x00\x00',)) |
|
|
|
|
|
|
@@ -542,6 +562,14 @@ async def test_socks5_build_dst_addr_domain_with_remote_resolve(loop): |
|
|
|
assert resolved == ('python.org', 80) |
|
|
|
|
|
|
|
|
|
|
|
async def test_socks5_build_dst_addr_domain_with_remote_resolve(loop): |
|
|
|
proto = make_socks5(loop) |
|
|
|
dst_req, resolved = await proto.build_dst_address('python.org' * 20, 80) |
|
|
|
|
|
|
|
assert dst_req == [0x03, b'\xc8', b'python.org' * 20, b'\x00P'] |
|
|
|
assert resolved == ('python.org' * 20, 80) |
|
|
|
|
|
|
|
|
|
|
|
async def test_socks5_build_dst_addr_domain_with_locale_resolve(loop): |
|
|
|
proto = make_socks5(loop, rr=False) |
|
|
|
dst_req, resolved = await proto.build_dst_address('python.org', 80) |
|
|
|