Browse Source

add local end point for ssh to the controller...

this allows all comms, including generating an access token to happen
over https allowing a single entry point to the entire system...
main
John-Mark Gurney 3 years ago
parent
commit
612fa8b840
1 changed files with 148 additions and 1 deletions
  1. +148
    -1
      bitelab/__init__.py

+ 148
- 1
bitelab/__init__.py View File

@@ -686,6 +686,17 @@ async def root_test(board_prio: dict = Depends(board_priority),
settings: config.Settings = Depends(get_settings)):
return { 'foo': 'bar', 'board': board_prio }

@router.websocket("/ssh")
async def board_exec_ws(
websocket: WebSocket):
await websocket.accept()
try:
async with HandleExecOnly(websocket.receive_bytes,
websocket.send_bytes, always=[ 'sshd', '-i' ]) as server:
await server.get_finish_handler()
finally:
await websocket.close()

@router.get('/board/',response_model=Dict[str, Board])
async def get_boards(user: str = Depends(lookup_user),
brdmgr: BoardManager = Depends(get_boardmanager)):
@@ -751,6 +762,83 @@ async def reserve_board(board_id_or_class,

return brd

class HandleExecOnly(WSFWDServer):
def __init__(self, *args, always, **kwargs):
super().__init__(*args, **kwargs)

self._always = always
self._did_exec = False

self._finish_handler = asyncio.Event()

async def shutdown(self):
pass

async def process_stdin(self, data):
stdin = self._proc.stdin
stdin.write(data)
await stdin.drain()

async def process_stdout(self):
stdout = self._proc.stdout
stream = self._stdout_stream

try:
while True:
data = await stdout.read(16384)
if not data:
break
self.sendstream(stream, data)
await self.drain(stream)
finally:
await self.sendcmd(dict(cmd='chanclose', chan=stream))

async def process_proc_wait(self):
# Wait for process to exit
code = await self._proc.wait()
await self.sendcmd(dict(cmd='exit', code=code))

# Make sure that all stdout is sent
await self._stdout_task

await self._stdin_event.wait()

self._finish_handler.set()

async def handle_chanclose(self, msg):
self.clear_stream_handler(self._stdin_stream)
self._proc.stdin.close()
await self._proc.stdin.wait_closed()
self._stdin_event.set()

async def handle_exec(self, msg):
if self._did_exec:
raise RuntimeError('already did exec')

self._proc = await \
asyncio.create_subprocess_exec(*self._always,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)

self._did_exec = True

self._stdin_stream = msg['stdin']
self._stdout_stream = msg['stdout']

# handle stdin
self._stdin_event = asyncio.Event()
self.add_stream_handler(msg['stdin'], self.process_stdin)

# handle stdout
self._stdout_task = asyncio.create_task(self.process_stdout())

# handle process exit
self._proc_wait_task = asyncio.create_task(self.process_proc_wait())

async def get_finish_handler(self):
return await self._finish_handler.wait()

class HandleExec(WSFWDServer):
def __init__(self, *args, board_id, data, brdmgr, **kwargs):
super().__init__(*args, **kwargs)
@@ -1022,6 +1110,64 @@ class TestWebSocket(TestCommon):
shutil.rmtree(self.basetempdir)
self.basetempdir = None

@patch('asyncio.create_subprocess_exec')
@timeout(2)
async def test_ssh(self, cse):

def wrapper(corofun):
async def foo(*args, **kwargs):
r = await corofun(*args, **kwargs)
#print('foo:', repr(corofun), repr((args, kwargs)), repr(r))
return r

return foo

async with websockets.connect('ws://foo/ssh',
path=self.socketpath) as websocket, \
WSFWDClient(wrapper(websocket.recv), wrapper(websocket.send)) as client:
mstdout = AsyncMock()

cmdargs = [ 'sshd', '-i' ]

echodata = b'somedata'
wrap_subprocess_exec(cse, stdout=echodata, retcode=0)

# that the args doesn't matter
client.add_stream_handler(2, mstdout)
proc = await client.exec([ 'random', 'program' ], stdin=1, stdout=2)

# that it cannot be exec'd a second time
with self.assertRaises(RuntimeError):
await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)

stdin, stdout = proc.stdin, proc.stdout

stdin.write(echodata)
await stdin.drain()

# that we get our data
self.assertEqual(await stdout.read(len(echodata)), echodata)

# and that there is no more
self.assertEqual(await stdout.read(len(echodata)), b'')

# and we are truly at EOF
self.assertTrue(stdout.at_eof())

stdin.close()
await stdin.wait_closed()

await proc.wait()

cse.assert_called_with(*cmdargs,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)

# spin things, not sure best way to handle this
await asyncio.sleep(.01)

cse.return_value.stdin.close.assert_called_with()

@patch('bitelab.snmp.snmpset')
@patch('bitelab.snmp.snmpget')
@patch('asyncio.create_subprocess_exec')
@@ -1060,7 +1206,7 @@ class TestWebSocket(TestCommon):
# That since the board isn't reserved, it fails
with self.assertRaisesRegex(RuntimeError,
'Board not reserved.'):
await client.exec([ 'sshd', '-i' ], stdin=1,
await client.exec(cmdargs, stdin=1,
stdout=2)

# that when the board is reserved by the wrong user
@@ -1084,6 +1230,7 @@ class TestWebSocket(TestCommon):
client.add_stream_handler(2, mstdout)
proc = await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)

# that it cannot be exec'd a second time
with self.assertRaises(RuntimeError):
await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)



Loading…
Cancel
Save