From 612fa8b840f9aa03affb45fea22421edfbbf2484 Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Wed, 23 Dec 2020 15:33:29 -0800 Subject: [PATCH] add local end point for ssh to the controller... this allows all comms, including generating an access token to happen over https allowing a single entry point to the entire system... --- bitelab/__init__.py | 149 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 148 insertions(+), 1 deletion(-) diff --git a/bitelab/__init__.py b/bitelab/__init__.py index ca56147..d431387 100644 --- a/bitelab/__init__.py +++ b/bitelab/__init__.py @@ -686,6 +686,17 @@ async def root_test(board_prio: dict = Depends(board_priority), settings: config.Settings = Depends(get_settings)): return { 'foo': 'bar', 'board': board_prio } +@router.websocket("/ssh") +async def board_exec_ws( + websocket: WebSocket): + await websocket.accept() + try: + async with HandleExecOnly(websocket.receive_bytes, + websocket.send_bytes, always=[ 'sshd', '-i' ]) as server: + await server.get_finish_handler() + finally: + await websocket.close() + @router.get('/board/',response_model=Dict[str, Board]) async def get_boards(user: str = Depends(lookup_user), brdmgr: BoardManager = Depends(get_boardmanager)): @@ -751,6 +762,83 @@ async def reserve_board(board_id_or_class, return brd +class HandleExecOnly(WSFWDServer): + def __init__(self, *args, always, **kwargs): + super().__init__(*args, **kwargs) + + self._always = always + self._did_exec = False + + self._finish_handler = asyncio.Event() + + async def shutdown(self): + pass + + async def process_stdin(self, data): + stdin = self._proc.stdin + stdin.write(data) + await stdin.drain() + + async def process_stdout(self): + stdout = self._proc.stdout + stream = self._stdout_stream + + try: + while True: + data = await stdout.read(16384) + if not data: + break + self.sendstream(stream, data) + await self.drain(stream) + finally: + await self.sendcmd(dict(cmd='chanclose', chan=stream)) + + async def process_proc_wait(self): + # Wait for process to exit + code = await self._proc.wait() + await self.sendcmd(dict(cmd='exit', code=code)) + + # Make sure that all stdout is sent + await self._stdout_task + + await self._stdin_event.wait() + + self._finish_handler.set() + + async def handle_chanclose(self, msg): + self.clear_stream_handler(self._stdin_stream) + self._proc.stdin.close() + await self._proc.stdin.wait_closed() + self._stdin_event.set() + + async def handle_exec(self, msg): + if self._did_exec: + raise RuntimeError('already did exec') + + self._proc = await \ + asyncio.create_subprocess_exec(*self._always, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + self._did_exec = True + + self._stdin_stream = msg['stdin'] + self._stdout_stream = msg['stdout'] + + # handle stdin + self._stdin_event = asyncio.Event() + self.add_stream_handler(msg['stdin'], self.process_stdin) + + # handle stdout + self._stdout_task = asyncio.create_task(self.process_stdout()) + + # handle process exit + self._proc_wait_task = asyncio.create_task(self.process_proc_wait()) + + async def get_finish_handler(self): + return await self._finish_handler.wait() + class HandleExec(WSFWDServer): def __init__(self, *args, board_id, data, brdmgr, **kwargs): super().__init__(*args, **kwargs) @@ -1022,6 +1110,64 @@ class TestWebSocket(TestCommon): shutil.rmtree(self.basetempdir) self.basetempdir = None + @patch('asyncio.create_subprocess_exec') + @timeout(2) + async def test_ssh(self, cse): + + def wrapper(corofun): + async def foo(*args, **kwargs): + r = await corofun(*args, **kwargs) + #print('foo:', repr(corofun), repr((args, kwargs)), repr(r)) + return r + + return foo + + async with websockets.connect('ws://foo/ssh', + path=self.socketpath) as websocket, \ + WSFWDClient(wrapper(websocket.recv), wrapper(websocket.send)) as client: + mstdout = AsyncMock() + + cmdargs = [ 'sshd', '-i' ] + + echodata = b'somedata' + wrap_subprocess_exec(cse, stdout=echodata, retcode=0) + + # that the args doesn't matter + client.add_stream_handler(2, mstdout) + proc = await client.exec([ 'random', 'program' ], stdin=1, stdout=2) + + # that it cannot be exec'd a second time + with self.assertRaises(RuntimeError): + await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2) + + stdin, stdout = proc.stdin, proc.stdout + + stdin.write(echodata) + await stdin.drain() + + # that we get our data + self.assertEqual(await stdout.read(len(echodata)), echodata) + + # and that there is no more + self.assertEqual(await stdout.read(len(echodata)), b'') + + # and we are truly at EOF + self.assertTrue(stdout.at_eof()) + + stdin.close() + await stdin.wait_closed() + + await proc.wait() + + cse.assert_called_with(*cmdargs, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + # spin things, not sure best way to handle this + await asyncio.sleep(.01) + + cse.return_value.stdin.close.assert_called_with() + @patch('bitelab.snmp.snmpset') @patch('bitelab.snmp.snmpget') @patch('asyncio.create_subprocess_exec') @@ -1060,7 +1206,7 @@ class TestWebSocket(TestCommon): # That since the board isn't reserved, it fails with self.assertRaisesRegex(RuntimeError, 'Board not reserved.'): - await client.exec([ 'sshd', '-i' ], stdin=1, + await client.exec(cmdargs, stdin=1, stdout=2) # that when the board is reserved by the wrong user @@ -1084,6 +1230,7 @@ class TestWebSocket(TestCommon): client.add_stream_handler(2, mstdout) proc = await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2) + # that it cannot be exec'd a second time with self.assertRaises(RuntimeError): await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)