@@ -6,7 +6,9 @@ from .errors import ( | |||
from .helpers import ( | |||
SocksAddr, Socks4Addr, Socks5Addr, Socks4Auth, Socks5Auth | |||
) | |||
from .protocols import Socks4Protocol, Socks5Protocol, DEFAULT_LIMIT | |||
from .protocols import ( | |||
Socks4Protocol, Socks5Protocol, Socks5DGramProtocol, DEFAULT_LIMIT | |||
) | |||
__version__ = '0.2.6' | |||
@@ -17,6 +19,108 @@ __all__ = ('Socks4Protocol', 'Socks5Protocol', 'Socks4Auth', | |||
'InvalidServerReply', 'create_connection', 'open_connection') | |||
# https://stackoverflow.com/a/53789029 | |||
def chain__await__(f): | |||
return lambda *args, **kwargs: f(*args, **kwargs).__await__() | |||
class DGram(object): | |||
'''An object that represents a datagram object. | |||
Use the send method to send data to the remote host. | |||
To receive data, simply await on the instance, and the next available | |||
datagram will be returned when available. | |||
When done, call the close method to shut everything down.''' | |||
def __init__(self, socksproto, hdr): | |||
self._sockproto = socksproto | |||
self._hdr = hdr | |||
self._q = asyncio.Queue() | |||
def connection_made(self, transport): | |||
self._dgtrans = transport | |||
def datagram_received(self, data, addr): | |||
'''Process relay UDP packets from the SOCKS server.''' | |||
frag, addr, payload = self._sockproto.parse_udp(data) | |||
if frag != 0: | |||
return | |||
self._q.put_nowait((payload, addr)) | |||
@property | |||
def proxy_sockname(self): | |||
return self._sockproto.proxy_sockname | |||
def send(self, data): | |||
'''Send datagram to the SOCKS server. | |||
This will wrap the datagram as needed before sending it on. | |||
This currently does not fragment UDP packets.''' | |||
self._dgtrans.sendto(self._hdr + data, None) | |||
def close(self): | |||
pass | |||
@chain__await__ | |||
async def __await__(self): | |||
'''Receive a datagram.''' | |||
return await self._q.get() | |||
async def open_datagram(proxy, proxy_auth, dst, *, | |||
remote_resolve=True, loop=None, family=0, | |||
proto=0, flags=0, sock=None, local_addr=None, | |||
server_hostname=None, reader_limit=DEFAULT_LIMIT): | |||
'''Create a transport object used to receive and send UDP packets | |||
to dst, via the SOCKS v5 proxy specified by proxy. | |||
The returned value is an instance of DGram.''' | |||
loop = loop or asyncio.get_event_loop() | |||
waiter = asyncio.Future(loop=loop) | |||
def sockdgram_factory(): | |||
if not isinstance(proxy, Socks5Addr): | |||
raise ValueError('only SOCKS v5 supports UDP') | |||
return Socks5DGramProtocol(proxy=proxy, proxy_auth=proxy_auth, dst=dst, | |||
app_protocol_factory=None, | |||
waiter=waiter, remote_resolve=remote_resolve, | |||
loop=loop, server_hostname=server_hostname, | |||
reader_limit=reader_limit) | |||
try: | |||
# connect to socks proxy | |||
transport, protocol = await loop.create_connection( | |||
sockdgram_factory, proxy.host, proxy.port, family=family, | |||
proto=proto, flags=flags, sock=sock, local_addr=local_addr) | |||
except OSError as exc: | |||
raise SocksConnectionError( | |||
'[Errno %s] Can not connect to proxy %s:%d [%s]' % | |||
(exc.errno, proxy.host, proxy.port, exc.strerror)) from exc | |||
try: | |||
await waiter | |||
except Exception: # noqa | |||
transport.close() | |||
raise | |||
# Build the header that the SOCKS UDP relay expects | |||
# https://tools.ietf.org/html/rfc1928#section-7 | |||
hdr = (await protocol.build_dst_address(*dst))[0] | |||
hdr = protocol.flatten_req([ 0, 0, 0, ] + hdr) | |||
# connect to the UDP relay the socks server told us to | |||
dgtrans, dgproto = await loop.create_datagram_endpoint( | |||
lambda: DGram(protocol, hdr), remote_addr=protocol.proxy_sockname) | |||
return dgproto | |||
async def create_connection(protocol_factory, proxy, proxy_auth, dst, *, | |||
remote_resolve=True, loop=None, ssl=None, family=0, | |||
proto=0, flags=0, sock=None, local_addr=None, | |||
@@ -17,6 +17,8 @@ DEFAULT_LIMIT = getattr(asyncio.streams, '_DEFAULT_LIMIT', 2**16) | |||
class BaseSocksProtocol(asyncio.StreamReaderProtocol): | |||
cmd = c.SOCKS_CMD_CONNECT | |||
def __init__(self, proxy, proxy_auth, dst, app_protocol_factory, waiter, *, | |||
remote_resolve=True, loop=None, ssl=False, | |||
server_hostname=None, negotiate_done_cb=None, | |||
@@ -133,7 +135,8 @@ class BaseSocksProtocol(asyncio.StreamReaderProtocol): | |||
async def socks_request(self, cmd): | |||
raise NotImplementedError | |||
def write_request(self, request): | |||
@staticmethod | |||
def flatten_req(request): | |||
bdata = bytearray() | |||
for item in request: | |||
@@ -143,6 +146,11 @@ class BaseSocksProtocol(asyncio.StreamReaderProtocol): | |||
bdata += item | |||
else: | |||
raise ValueError('Unsupported item') | |||
return bdata | |||
def write_request(self, request): | |||
bdata = self.flatten_req(request) | |||
self._stream_writer.write(bdata) | |||
async def read_response(self, n): | |||
@@ -389,3 +397,37 @@ class Socks5Protocol(BaseSocksProtocol): | |||
port = struct.unpack('>H', port)[0] | |||
return addr, port | |||
async def build_udp(self, frag, addr, payload=b''): | |||
req, _ = await self.build_dst_address(*addr) | |||
return self.flatten_req([ 0, 0, frag ] + req + [ payload ]) | |||
@staticmethod | |||
def parse_udp(payload): | |||
resv, frag, atype = struct.unpack('>HBB', payload[:4]) | |||
if resv != 0: | |||
raise InvalidServerReply('SOCKS5 proxy server sent invalid data') | |||
pos = 4 | |||
if atype == c.SOCKS5_ATYP_IPv4: | |||
last = pos + 4 | |||
addr = socket.inet_ntoa(payload[pos:last]) | |||
elif atype == c.SOCKS5_ATYP_DOMAIN: | |||
length = payload[pos] | |||
pos += 1 | |||
last = pos + length | |||
addr = payload[pos:pos + length] | |||
addr = addr.decode('idna') | |||
elif atype == c.SOCKS5_ATYP_IPv6: | |||
last = pos + 16 | |||
addr = socket.inet_ntop(socket.AF_INET6, payload[pos:last]) | |||
else: | |||
raise InvalidServerReply('SOCKS5 proxy server sent invalid data') | |||
port = int.from_bytes(payload[last:last + 2], 'big') | |||
last += 2 | |||
return frag, (addr, port), payload[last:] | |||
class Socks5DGramProtocol(Socks5Protocol): | |||
cmd = c.SOCKS_CMD_UDP_ASSOCIATE |
@@ -1,12 +1,18 @@ | |||
import pytest | |||
import aiosocks | |||
import aiohttp | |||
import asyncio | |||
import os | |||
import ssl | |||
import struct | |||
from aiohttp import web | |||
from aiohttp.test_utils import RawTestServer | |||
from aiohttp.test_utils import make_mocked_coro | |||
from aiosocks.test_utils import FakeSocksSrv, FakeSocks4Srv | |||
from aiosocks.connector import ProxyConnector, ProxyClientRequest | |||
from aiosocks.errors import SocksConnectionError | |||
from async_timeout import timeout | |||
from unittest import mock | |||
async def test_socks4_connect_success(loop): | |||
@@ -56,6 +62,109 @@ async def test_socks4_srv_error(loop): | |||
assert '0x5b' in str(ct) | |||
# https://stackoverflow.com/a/55693498 | |||
def with_timeout(t): | |||
def wrapper(corofunc): | |||
async def run(*args, **kwargs): | |||
with timeout(t): | |||
return await corofunc(*args, **kwargs) | |||
return run | |||
return wrapper | |||
async def test_socks4_datagram_failure(): | |||
loop = asyncio.get_event_loop() | |||
async with FakeSocksSrv(loop, b'') as srv: | |||
addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) | |||
with pytest.raises(ValueError): | |||
await aiosocks.open_datagram(addr, None, None, loop=loop) | |||
async def test_socks4_datagram_connect_failure(): | |||
loop = asyncio.get_event_loop() | |||
async def raiseconnerr(*args, **kwargs): | |||
raise OSError(1) | |||
async with FakeSocksSrv(loop, b'') as srv: | |||
addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) | |||
with mock.patch.object(loop, 'create_connection', | |||
raiseconnerr), pytest.raises(SocksConnectionError): | |||
await aiosocks.open_datagram(addr, None, None, loop=loop) | |||
@with_timeout(2) | |||
async def test_socks5_datagram_success_anonymous(): | |||
# | |||
# This code is testing aiosocks.open_datagram. | |||
# | |||
# The server it is interacting with is srv (FakeSocksSrv). | |||
# | |||
# We mock the UDP Protocol to the SOCKS server w/ | |||
# sockservdgram (FakeDGramTransport) | |||
# | |||
# UDP packet flow: | |||
# dgram (DGram) -> sockservdgram (FakeDGramTransport) | |||
# which reflects it back for delivery | |||
# | |||
loop = asyncio.get_event_loop() | |||
pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' | |||
respdata = b'response data' | |||
async with FakeSocksSrv(loop, pld) as srv: | |||
addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) | |||
auth = aiosocks.Socks5Auth('usr', 'pwd') | |||
dname = 'python.org' | |||
portnum = 53 | |||
dst = (dname, portnum) | |||
class FakeDGramTransport(asyncio.DatagramTransport): | |||
def sendto(self, data, addr=None): | |||
# Verify correct packet was receieved | |||
frag, addr, payload = aiosocks.protocols.Socks5Protocol.parse_udp(data) | |||
assert frag == 0 | |||
assert addr == ('python.org', 53) | |||
assert payload == b'some data' | |||
# Send frag reply, make sure it's ignored | |||
ba = bytearray() | |||
ba.extend([ 0, 0, 1, 1, 2, 2, 2, 2, ]) | |||
ba += (53).to_bytes(2, 'big') | |||
ba += respdata | |||
dgram.datagram_received(ba, ('3.3.3.3', 0)) | |||
# Send reply | |||
# wish I could use build_udp here, but it's async | |||
ba = bytearray() | |||
ba.extend([ 0, 0, 0, 1, 2, 2, 2, 2, ]) | |||
ba += (53).to_bytes(2, 'big') | |||
ba += respdata | |||
dgram.datagram_received(ba, ('3.3.3.3', 0)) | |||
sockservdgram = FakeDGramTransport() | |||
async def fake_cde(factory, remote_addr): | |||
assert remote_addr == ('1.1.1.1', 1111) | |||
proto = factory() | |||
proto.connection_made(sockservdgram) | |||
return sockservdgram, proto | |||
with mock.patch.object(loop, 'create_datagram_endpoint', | |||
fake_cde) as m: | |||
dgram = await aiosocks.open_datagram(addr, None, dst, loop=loop) | |||
assert dgram.proxy_sockname == ('1.1.1.1', 1111) | |||
dgram.send(b'some data') | |||
# XXX -- assert from fakesockssrv | |||
assert await dgram == (respdata, ('2.2.2.2', 53)) | |||
dgram.close() | |||
async def test_socks5_connect_success_anonymous(loop): | |||
pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' | |||
@@ -8,6 +8,7 @@ from asyncio import coroutine as coro, sslproto | |||
from aiohttp.test_utils import make_mocked_coro | |||
import aiosocks.constants as c | |||
from aiosocks.protocols import BaseSocksProtocol | |||
from aiosocks.errors import InvalidServerReply | |||
def make_base(loop, *, dst=None, waiter=None, ap_factory=None, ssl=None): | |||
@@ -604,6 +605,52 @@ async def test_socks5_rd_addr_domain(loop): | |||
assert r == (b'python.org', 80) | |||
async def test_socks5_build_udp_ipv4(loop): | |||
proto = make_socks5(loop) | |||
assert (await proto.build_udp(5, ('1.2.3.4', 16)) == | |||
b'\x00\x00\x05\x01\x01\x02\x03\x04\x00\x10') | |||
async def test_socks5_parse_udp_ipv4(loop): | |||
proto = make_socks5(loop) | |||
frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x01\x01\x02\x09\x04\x00\x20foobar') | |||
assert frag == 7 | |||
assert addr == ('1.2.9.4', 32) | |||
assert data == b'foobar' | |||
async def test_socks5_parse_udp_domain(loop): | |||
proto = make_socks5(loop) | |||
frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x03\x06domain\x00\x20foobar') | |||
assert frag == 7 | |||
assert addr == ('domain', 32) | |||
assert data == b'foobar' | |||
async def test_socks5_parse_udp_ipv6(loop): | |||
proto = make_socks5(loop) | |||
frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x04' | |||
b' \x01\r\xb8\x11\xa3\t\xd7\x1f4\x8a.\x07\xa0v]' | |||
b'\x00\x20foobar') | |||
assert frag == 7 | |||
assert addr == ('2001:db8:11a3:9d7:1f34:8a2e:7a0:765d', 32) | |||
assert data == b'foobar' | |||
async def test_socks5_parse_udp_invalid(loop): | |||
proto = make_socks5(loop) | |||
for i in [ | |||
b'\x01\x00\x07\x01\x01\x02\x09\x04\x00\x20foobar', | |||
b'\x00\x01\x07\x01\x01\x02\x09\x04\x00\x20foobar', | |||
b'\x00\x00\x07\x09\x01\x02\x09\x04\x00\x20foobar', | |||
]: | |||
with pytest.raises(InvalidServerReply): | |||
proto.parse_udp(i) | |||
async def test_socks5_socks_req_inv_ver(loop): | |||
proto = make_socks5(loop, r=[b'\x05\x00', b'\x04\x00\x00']) | |||