|
- #
- # Copyright (c) 2020 The FreeBSD Foundation
- #
- # This software1 was developed by John-Mark Gurney under sponsorship
- # from the FreeBSD Foundation.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # 1. Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- # SUCH DAMAGE.
- #
-
- from dataclasses import dataclass
- from functools import lru_cache, wraps
- from io import StringIO
- from typing import Optional, Union, Dict, Any
-
- from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException
- from fastapi import Path, Request
- from fastapi.security import OAuth2PasswordBearer
- from fastapi.websockets import WebSocket
- from httpx import AsyncClient, Auth
- from starlette.responses import JSONResponse
- from starlette.status import HTTP_200_OK
- from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, \
- HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_409_CONFLICT
- from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
- from unittest.mock import create_autospec, patch, AsyncMock, Mock, PropertyMock
- from wsfwd import WSFWDServer, WSFWDClient, timeout, _tbprinter
-
- # For WebSocket testing
- from hypercorn.config import Config
- from hypercorn.asyncio import serve
-
- from . import config
- from .data import *
- from .abstract import *
- from .snmp import *
- from .mocks import *
- from .iso8601 import parse_date
-
- import asyncio
- import contextlib
- import json
- import logging
- import orm
- import os
- import shutil
- import socket
- import sqlite3
- import subprocess
- import sys
- import tempfile
- import time
- import ucl
- import unittest
- import urllib
- import websockets
-
- epsilon = sys.float_info.epsilon
-
- # fix up parse_socket_addr for hypercorn
- from hypercorn.utils import parse_socket_addr
- from hypercorn.asyncio import tcp_server
- def new_parse_socket_addr(domain, addr):
- if domain == socket.AF_UNIX:
- return (addr, -1)
-
- return parse_socket_addr(domain, addr)
-
- tcp_server.parse_socket_addr = new_parse_socket_addr
-
- def looptoutc(looptime):
- '''The argument looptime, which is a time stamp relative to the
- current event loop's clock, to UTC.
-
- It does this by calculating the current offset, and applying that
- offset. This will deal with any time drift issues as it is
- expected that the loop's clock does not stay in sync w/ UTC, but
- it does mean that large differences from the current time are less
- accurate. That is if the returned value - current UTC is large,
- then the accuracy of the time is not very high.
-
- Modern clocks are pretty accurate, but modern crystals do have an
- error that will accumulate over time.
-
- This is only tested to nanosecond precision, as floating point does
- not allow higher precision (and even if it did, as there is no way
- to get the offset between the two in a single call, it will likely
- introduce a larger offset than nanoseconds).
- '''
-
- loop = asyncio.get_running_loop()
- curlooptime = loop.time()
- utctime = time.time()
-
- off = looptime - curlooptime
-
- return utctime + off
-
- def utctoloop(utctime):
- '''For documentation, see looptoutc. This is the inverse, but
- all the warnings in there apply here as well.
- '''
-
- loop = asyncio.get_running_loop()
- looptime = loop.time()
- curutctime = time.time()
-
- off = utctime - curutctime
-
- return looptime + off
-
- async def log_event(tag, board=None, user=None, extra={}):
- info = extra.copy()
- info['event'] = tag
-
- if board is not None:
- info['board_name'] = board.name
- else:
- info.pop('board_name', None)
-
- if user is not None:
- info['user'] = user
- else:
- info.pop('user', None)
-
- t = time.time()
- info['date'] = time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(t)) + \
- '.%03dZ' % (int((t * 1000) % 1000),)
-
- logging.info(json.dumps(info))
-
- class TimeOut(Attribute):
- '''
- Implement a TimeOut functionality. The argument val (first and
- only) to __init__ is a number of seconds for the timeout to
- last. This will start the time ticking on activation. If it
- is not deactivated before the timer expires, it will deactivate
- the board itself.
-
- Not that this uses the asyncio loop timescale and NOT UTC. This
- means that over large durations, the clock will drift. This means
- that over time, the "expired" time will change.
-
- While the board is not activated, it will display the timeout in
- seconds. When the board is activated, the getvalue will return
- the time the board will be deactivated.
- '''
-
- defattrname = 'timeout'
-
- def __init__(self, val):
- self._value = val
- self._brd = None
-
- # proteted by brd.lock
- self._cb = None
- self._task = None
- self._exp = None
-
- async def getvalue(self):
- if self._exp is None:
- return self._value
-
- t = looptoutc(self._exp)
-
- return time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(t)) + \
- '.%03dZ' % (int((t * 1000) % 1000),)
-
- async def setvalue(self, v):
- if self._exp is None:
- raise RuntimeError('cannot set when not activate')
-
- loop = asyncio.get_running_loop()
-
- t = parse_date(v).timestamp()
- loopvalue = utctoloop(t)
-
- async with self._brd.lock:
- if loopvalue > self._exp:
- raise ValueError('value in the future')
-
- # I really don't know how test this
- if self._task is not None:
- raise ValueError('should never happen')
-
- await self.deactivate(self._brd)
- self._cb = loop.call_at(loopvalue, self.timeout_callback)
- self._exp = self._cb.when()
-
- async def activate(self, brd):
- assert brd.lock.locked()
-
- loop = asyncio.get_running_loop()
- self._brd = brd
- self._cb = loop.call_later(self._value, self.timeout_callback)
- self._exp = self._cb.when()
- self._task = None
-
- async def deactivate(self, brd):
- assert brd.lock.locked()
-
- if self._cb is not None:
- self._cb.cancel()
- self._cb = None
-
- if self._task is not None:
- self._task.cancel()
-
- # awaiting on a canceled task blocks, spin the
- # loop and make sure it was cancelled
- await asyncio.sleep(0)
- assert self._task.cancelled()
- self._task = None
-
- self._exp = None
-
- @_tbprinter
- async def timeout_coro(self):
- print('tc1')
- async with self._brd.lock:
- print('tc2')
- await self._brd.release()
- print('tc3')
-
- def timeout_callback(self):
- self._task = asyncio.create_task(self.timeout_coro())
-
- class EtherIface(DefROAttribute):
- defattrname = 'eiface'
-
- async def activate(self, brd):
- cmd = ('ifconfig', self._value, 'vnet', brd.name,)
- sub = await asyncio.create_subprocess_exec(*cmd,
- stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- ret = await sub.wait()
-
- if ret:
- raise RuntimeError('activate failed: %d' % ret)
-
- class SerialConsole(DefROAttribute):
- defattrname = 'console'
-
- async def activate(self, brd):
- devname = os.path.basename(self._value)
-
- for i in (devname, devname + '.*'):
- cmd = ('devfs', '-m', brd.attrs['devfspath'], 'rule',
- 'apply', 'path', i, 'unhide', )
- sub = await asyncio.create_subprocess_exec(*cmd,
- stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- ret = await sub.wait()
-
- if ret:
- raise RuntimeError('activate failed: %d' % ret)
-
- class BoardImpl:
- def __init__(self, name, brdclass, options):
- self.name = name
- self.brdclass = brdclass
- self.options = options
- self.reserved = False
- self.attrmap = {}
- self.lock = asyncio.Lock()
- for i in options:
- cls, kwargs = i
- opt = cls(**kwargs)
- if opt.defattrname in self.attrmap:
- raise ValueError(
- 'attribute name %s duplicated' %
- repr(opt.defattrname))
- self.attrmap[opt.defattrname] = opt
-
- self.attrcache = {}
-
- def __repr__(self): #pragma: no cover
- return repr(Board.from_orm(self))
-
- async def reserve(self):
- assert self.lock.locked() and not self.reserved
-
- self.reserved = True
-
- async def release(self):
- assert self.lock.locked() and self.reserved
-
- self.reserved = False
-
- async def update_attrs(self, **attrs):
- assert self.lock.locked() and self.reserved
-
- for i in attrs:
- self.attrcache[i] = await self.attrmap[i].setvalue(attrs[i])
-
- async def update(self):
- for i in self.attrmap:
- self.attrcache[i] = await self.attrmap[i].getvalue()
-
- async def activate(self):
- assert self.lock.locked() and self.reserved
-
- for i in self.attrmap.values():
- await i.activate(self)
-
- async def deactivate(self):
- assert self.lock.locked() and self.reserved
-
- for i in self.attrmap.values():
- await i.deactivate(self)
-
- def add_info(self, d):
- self.attrcache.update(d)
-
- def clean_info(self):
- # clean up attributes
- for i in set(self.attrcache) - set(self.attrmap):
- del self.attrcache[i]
-
- @property
- def attrs(self):
- return dict(self.attrcache)
-
- @dataclass
- class BITEError(Exception):
- errobj: Error
- status_code: int
-
- class BoardManager(object):
- _option_map = dict(
- etheriface=EtherIface,
- serialconsole=SerialConsole,
- snmppower=SNMPPower,
- )
-
- def __init__(self, cls_info, boards):
- # add the name to the classes
- classes = { k: dict(clsname=k, **cls_info[k]) for k in cls_info }
- self.board_class_info = classes
-
- self.boards = dict(**{ x.name: x for x in
- (BoardImpl(**y) for y in boards)})
-
- @classmethod
- def from_settings(cls, settings):
- return cls.from_ucl(settings.board_conf)
-
- @classmethod
- def from_ucl(cls, fname):
- with open(fname) as fp:
- conf = ucl.load(fp.read())
-
- classes = conf['classes']
-
- brds = conf['boards']
- makeopt = lambda x: (cls._option_map[x['cls']], { k: v for k, v in x.items() if k != 'cls' })
- for i in brds:
- opt = i['options']
- opt[:] = [ makeopt(x) for x in opt ]
-
- return cls(classes, brds)
-
- def classes(self):
- return self.board_class_info
-
- def unhashable_lru():
- def newwrapper(fun):
- cache = {}
-
- @wraps(fun)
- def wrapper(*args, **kwargs):
- idargs = tuple(id(x) for x in args)
- idkwargs = tuple(sorted((k, id(v)) for k, v in
- kwargs.items()))
- k = (idargs, idkwargs)
- if k in cache:
- realargs, realkwargs, res = cache[k]
- if all(x is y for x, y in zip(args,
- realargs)) and all(realkwargs[x] is
- kwargs[x] for x in realkwargs):
- return res
-
- res = fun(*args, **kwargs)
- cache[k] = (args, kwargs, res)
-
- return res
-
- return wrapper
-
- return newwrapper
-
- class BiteAuth(Auth):
- def __init__(self, token):
- self.token = token
-
- def __eq__(self, o):
- return self.token == o.token
-
- def auth_flow(self, request):
- request.headers['Authorization'] = 'Bearer ' + self.token
- yield request
-
- # how to get coverage for this?
- @lru_cache()
- def get_settings(): # pragma: no cover
- return config.Settings()
-
- # how to get coverage for this?
- @unhashable_lru()
- def get_data(settings: config.Settings = Depends(get_settings)):
- #print(repr(settings))
- database = data.databases.Database('sqlite:///' + settings.db_file)
- d = make_orm(database)
- return d
-
- async def real_get_boardmanager(settings, data):
- brdmgr = BoardManager.from_settings(settings)
-
- # Clean up the database
- # XXX - This isn't a complete fix, we need a better solution.
- all = await data.BoardStatus.objects.all()
- await asyncio.gather(*(x.delete() for x in all))
-
- return brdmgr
-
- _global_lock = asyncio.Lock()
- _global_brdmgr = None
-
- async def get_boardmanager(settings: config.Settings = Depends(get_settings),
- data: data.DataWrapper = Depends(get_data)):
- global _global_brdmgr
-
- if _global_brdmgr is not None:
- return _global_brdmgr
-
- async with _global_lock:
- if _global_brdmgr is None:
- _global_brdmgr = await real_get_boardmanager(settings, data)
-
- return _global_brdmgr
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/nonexistent')
-
- def get_authorized_board_parms(board_id, token: str = Depends(oauth2_scheme),
- data: data.DataWrapper = Depends(get_data),
- brdmgr: BoardManager = Depends(get_boardmanager)):
- '''This dependancy is used to collect the parameters needed for
- the validate_board_params context manager.'''
-
- return dict(board_id=board_id, token=token, data=data, brdmgr=brdmgr)
-
- @contextlib.asynccontextmanager
- async def validate_board_params(board_id, data, brdmgr, user=None, token=None):
- '''This context manager checks to see if the request is authorized
- for the board_id. This requires that the board is reserved by
- the user, or the connection came from the board's jail (TBI).
- '''
-
- brd = brdmgr.boards[board_id]
-
- async with brd.lock:
- if user is None:
- user = await lookup_user(token, data)
-
- try:
- brduser = await data.BoardStatus.objects.get(board=board_id)
- except orm.exceptions.NoMatch:
- raise BITEError(
- status_code=HTTP_400_BAD_REQUEST,
- errobj=Error(error='Board not reserved.',
- board=Board.from_orm(brd)))
-
- if user != brduser.user:
- raise BITEError(
- status_code=HTTP_403_FORBIDDEN,
- errobj=Error(error='Board reserved by %s.' % repr(brduser.user),
- board=Board.from_orm(brd)))
-
- yield brd
-
- async def lookup_user(token: str = Depends(oauth2_scheme),
- data: data.DataWrapper = Depends(get_data)):
- try:
- return (await data.APIKey.objects.get(key=token)).user
- except orm.exceptions.NoMatch:
- raise HTTPException(
- status_code=HTTP_401_UNAUTHORIZED,
- detail='Invalid authentication credentials',
- headers={'WWW-Authenticate': 'Bearer'},
- )
-
- router = APIRouter()
-
- def board_priority(request: Request):
- # Get the board, if any, from the connection
- scope = request.scope
- return scope['server']
-
- @router.get('/board/classes', response_model=Dict[str, BoardClassInfo])
- async def get_board_classes(user: str = Depends(lookup_user),
- brdmgr: BoardManager = Depends(get_boardmanager)):
- return brdmgr.classes()
-
- @router.get('/board/{board_id}', response_model=Board)
- async def get_board_info(board_id, user: str = Depends(lookup_user),
- brdmgr: BoardManager = Depends(get_boardmanager)):
- brd = brdmgr.boards[board_id]
- await brd.update()
-
- return brd
-
- @router.post('/board/{board_id_or_class}/reserve', response_model=Union[Board, Error])
- async def reserve_board(board_id_or_class,
- req: Request,
- user: str = Depends(lookup_user),
- brdmgr: BoardManager = Depends(get_boardmanager),
- settings: config.Settings = Depends(get_settings),
- sshpubkey: str = Body(embed=True, default=None,
- title='Default public ssh key to install.'),
- data: data.DataWrapper = Depends(get_data)):
-
- #print('reserve:', repr(sshpubkey), repr(await req.body()))
- board_id = board_id_or_class
- brd = brdmgr.boards[board_id]
-
- async with brd.lock:
- try:
- obrdreq = await data.BoardStatus.objects.create(board=board_id,
- user=user)
- # XXX - There is a bug in orm where the returned
- # object has an incorrect board value
- # see: https://github.com/encode/orm/issues/47
- #assert obrdreq.board == board_id and \
- # obrdreq.user == user
- brdreq = await data.BoardStatus.objects.get(board=board_id,
- user=user)
- await brd.reserve()
- # XXX - orm isn't doing it's job here
- except sqlite3.IntegrityError:
- raise BITEError(
- status_code=HTTP_409_CONFLICT,
- errobj=Error(error='Board currently reserved.',
- board=Board.from_orm(brd)),
- )
-
- # Initialize board
- try:
- args = ( settings.setup_script, 'reserve',
- brd.name, user, )
- if sshpubkey is not None:
- args += (sshpubkey, )
- sub = await asyncio.create_subprocess_exec(*args,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = await sub.communicate()
- if sub.returncode:
- raise RuntimeError(sub.returncode, stderr)
- except Exception as e:
- await brdreq.delete()
- await brd.release()
- if isinstance(e, RuntimeError):
- retcode, stderr = e.args
- raise BITEError(
- status_code=HTTP_500_INTERNAL_SERVER_ERROR,
- errobj=Error(error=
- 'Failed to init board, ret: %d, stderr: %s' %
- (retcode, repr(stderr)),
- board=Board.from_orm(brd)),
- )
- raise
-
- brd.add_info(json.loads(stdout))
-
- await brd.activate()
-
- await log_event('reserve', user=user, board=brd)
-
- await brd.update()
-
- return brd
-
- class HandleExec(WSFWDServer):
- def __init__(self, *args, board_id, data, brdmgr, **kwargs):
- super().__init__(*args, **kwargs)
-
- self._board_id = board_id
- self._data = data
- self._brdmgr = brdmgr
- self._auth_user = None
- self._did_exec = False
-
- self._finish_handler = asyncio.Event()
-
- async def handle_auth(self, msg):
- try:
- user = await lookup_user(msg['auth']['bearer'],
- self._data)
- except Exception:
- raise RuntimeError('invalid token')
-
- self._auth_user = user
-
- async def shutdown(self):
- pass
-
- async def process_stdin(self, data):
- stdin = self._proc.stdin
- stdin.write(data)
- await stdin.drain()
-
- async def process_stdout(self):
- stdout = self._proc.stdout
- stream = self._stdout_stream
-
- try:
- while True:
- data = await stdout.read(16384)
- if not data:
- break
- self.sendstream(stream, data)
- await self.drain(stream)
- finally:
- await self.sendcmd(dict(cmd='chanclose', chan=stream))
-
- async def process_proc_wait(self):
- # Wait for process to exit
- code = await self._proc.wait()
- await self.sendcmd(dict(cmd='exit', code=code))
-
- # Make sure that all stdout is sent
- await self._stdout_task
-
- await self._stdin_event.wait()
-
- self._finish_handler.set()
-
- async def handle_chanclose(self, msg):
- self.clear_stream_handler(self._stdin_stream)
- self._proc.stdin.close()
- await self._proc.stdin.wait_closed()
- self._stdin_event.set()
-
- async def handle_exec(self, msg):
- if self._did_exec:
- raise RuntimeError('already did exec')
-
- if self._auth_user is None:
- raise RuntimeError('not authenticated')
-
- try:
- async with validate_board_params(self._board_id, self._data,
- self._brdmgr, user=self._auth_user) as brd:
- self._proc = await \
- asyncio.create_subprocess_exec('jexec',
- self._board_id, *msg['args'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- except BITEError as e:
- raise RuntimeError(e.errobj.error)
-
- self._did_exec = True
-
- self._stdin_stream = msg['stdin']
- self._stdout_stream = msg['stdout']
-
- # handle stdin
- self._stdin_event = asyncio.Event()
- self.add_stream_handler(msg['stdin'], self.process_stdin)
-
- # handle stdout
- self._stdout_task = asyncio.create_task(self.process_stdout())
-
- # handle process exit
- self._proc_wait_task = asyncio.create_task(self.process_proc_wait())
-
- async def get_finish_handler(self):
- return await self._finish_handler.wait()
-
- @router.websocket("/board/{board_id}/exec")
- async def board_exec_ws(
- board_id,
- websocket: WebSocket,
- brdmgr: BoardManager = Depends(get_boardmanager),
- settings: config.Settings = Depends(get_settings),
- data: data.DataWrapper = Depends(get_data)):
- await websocket.accept()
- try:
- async with HandleExec(websocket.receive_bytes,
- websocket.send_bytes, data=data,
- board_id=board_id, brdmgr=brdmgr) as server:
- await server.get_finish_handler()
- finally:
- await websocket.close()
-
- @router.post('/board/{board_id}/release', response_model=Union[Board, Error])
- async def release_board(board_id, user: str = Depends(lookup_user),
- brdmgr: BoardManager = Depends(get_boardmanager),
- settings: config.Settings = Depends(get_settings),
- data: data.DataWrapper = Depends(get_data)):
- brd = brdmgr.boards[board_id]
-
- async with brd.lock:
- # XXX - how to handle a release error?
- await log_event('release', user=user, board=brd)
-
- try:
- brduser = await data.BoardStatus.objects.get(board=board_id)
- if user != brduser.user:
- raise BITEError(
- status_code=HTTP_403_FORBIDDEN,
- errobj=Error(error='Board reserved by %s.' % repr(brduser.user),
- board=Board.from_orm(brd)))
-
- except orm.exceptions.NoMatch:
- raise BITEError(
- status_code=HTTP_400_BAD_REQUEST,
- errobj=Error(error='Board not reserved.',
- board=Board.from_orm(brd)),
- )
-
- await brd.deactivate()
-
- env = os.environ.copy()
- addkeys = { 'iface', 'ip', 'devfsrule', 'devfspath' }
- env.update((k, brd.attrs[k]) for k in addkeys if k in brd.attrs)
-
- sub = await asyncio.create_subprocess_exec(
- settings.setup_script, 'release', brd.name, user, env=env,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = await sub.communicate()
- retcode = sub.returncode
- if retcode:
- logging.error('release script failure: ' +
- 'board: %s, ret: %s, stderr: %s' % (repr(brd.name),
- retcode, repr(stderr)))
- raise BITEError(
- status_code=HTTP_500_INTERNAL_SERVER_ERROR,
- errobj=Error(error=
- 'Failed to release board, ret: %d, stderr: %s' %
- (retcode, repr(stderr)),
- board=Board.from_orm(brd)),
- )
-
- await data.BoardStatus.delete(brduser)
- await brd.release()
-
- brd.clean_info()
-
- await brd.update()
-
- return brd
-
- @router.post('/board/{board_id}/attrs', response_model=Union[Board, Error])
- async def set_board_attrs(
- attrs: Dict[str, Any],
- brdparams: dict = Depends(get_authorized_board_parms)):
-
- async with validate_board_params(**brdparams) as brd:
- await brd.update_attrs(**attrs)
-
- return brd
-
- @router.get('/board/',response_model=Dict[str, Board])
- async def get_boards(user: str = Depends(lookup_user),
- brdmgr: BoardManager = Depends(get_boardmanager)):
- brds = brdmgr.boards
- for i in brds:
- await brds[i].update()
-
- return brds
-
- @router.get('/')
- async def root_test(board_prio: dict = Depends(board_priority),
- settings: config.Settings = Depends(get_settings)):
- return { 'foo': 'bar', 'board': board_prio }
-
- def getApp():
- app = FastAPI()
- app.include_router(router)
-
- @app.exception_handler(BITEError)
- async def error_handler(request, exc):
- return JSONResponse(exc.errobj.dict(), status_code=exc.status_code)
-
- return app
-
- # uvicorn can't call the above function, while hypercorn can
- #app = getApp()
-
- class TestUnhashLRU(unittest.TestCase):
- def test_unhashlru(self):
- lsta = []
- lstb = []
-
- # that a wrapped function
- cachefun = unhashable_lru()(lambda x: object())
-
- # handles unhashable objects
- resa = cachefun(lsta)
- resb = cachefun(lstb)
-
- # that they return the same object again
- self.assertIs(resa, cachefun(lsta))
- self.assertIs(resb, cachefun(lstb))
-
- # that the object returned is not the same
- self.assertIsNot(cachefun(lsta), cachefun(lstb))
-
- # that a second wrapped funcion
- cachefun2 = unhashable_lru()(lambda x: object())
-
- # does not return the same object as the first cache
- self.assertIsNot(cachefun(lsta), cachefun2(lsta))
-
- class TestCommon(unittest.IsolatedAsyncioTestCase):
- def get_settings_override(self):
- return self.settings
-
- def get_data_override(self):
- return self.data
-
- def get_boardmanager_override(self):
- return self.brdmgr
-
- async def asyncSetUp(self):
- self.app = getApp()
-
- # setup test database
- self.dbtempfile = tempfile.NamedTemporaryFile()
- self.database = data.databases.Database('sqlite:///' +
- self.dbtempfile.name)
- self.data = make_orm(self.database)
-
- await data._setup_data(self.data)
-
- # setup settings
- self.settings = config.Settings(db_file=self.dbtempfile.name,
- setup_script='somesetupscript',
- board_conf = os.path.join('fixtures', 'board_conf.ucl')
- )
-
- self.brdmgr = BoardManager.from_settings(self.settings)
-
- self.app.dependency_overrides[get_settings] = \
- self.get_settings_override
- self.app.dependency_overrides[get_data] = self.get_data_override
- self.app.dependency_overrides[get_boardmanager] = \
- self.get_boardmanager_override
-
- # This is a different class then the other tests, as at the time of
- # writing, there is no async WebSocket client that will talk directly
- # to an ASGI server. The websockets client library can talk to a unix
- # domain socket, so that is used.
- class TestWebSocket(TestCommon):
- async def asyncSetUp(self):
-
- await super().asyncSetUp()
-
- d = os.path.realpath(tempfile.mkdtemp())
- self.basetempdir = d
-
- self.shutdown_event = asyncio.Event()
-
- self.socketpath = os.path.join(self.basetempdir, 'wstest.sock')
-
- config = Config()
- config.graceful_timeout = .01
- config.bind = [ 'unix:' + self.socketpath ]
- config.loglevel = 'ERROR'
-
- self.serv_task = asyncio.create_task(serve(self.app, config,
- shutdown_trigger=self.shutdown_event.wait))
-
- # get the unix domain socket connected
- # need a startup_trigger
- await asyncio.sleep(.01)
-
- async def asyncTearDown(self):
- self.app = None
-
- self.shutdown_event.set()
-
- await self.serv_task
-
- shutil.rmtree(self.basetempdir)
- self.basetempdir = None
-
- @patch('asyncio.create_subprocess_exec')
- @timeout(2)
- async def test_exec_sshd(self, cse):
- def wrapper(corofun):
- async def foo(*args, **kwargs):
- r = await corofun(*args, **kwargs)
- #print('foo:', repr(corofun), repr((args, kwargs)), repr(r))
- return r
-
- return foo
-
- async with websockets.connect('ws://foo/board/cora-1/exec',
- path=self.socketpath) as websocket, \
- WSFWDClient(wrapper(websocket.recv), wrapper(websocket.send)) as client:
- mstdout = AsyncMock()
-
- cmdargs = [ 'sshd', '-i' ]
- # that w/o auth, it fails
- with self.assertRaises(RuntimeError):
- await client.exec(cmdargs, stdin=1, stdout=2)
-
- # that and invalid token fails
- with self.assertRaises(RuntimeError):
- await client.auth(dict(bearer='invalidtoken'))
-
- # that a valid auth token works
- await client.auth(dict(bearer='thisisanapikey'))
-
- # That since the board isn't reserved, it fails
- with self.assertRaisesRegex(RuntimeError,
- 'Board not reserved.'):
- await client.exec([ 'sshd', '-i' ], stdin=1,
- stdout=2)
-
- # that when the board is reserved by the wrong user
- brd = self.brdmgr.boards['cora-1']
- obrdreq = await self.data.BoardStatus.objects.create(
- board='cora-1', user='bar')
- async with brd.lock:
- await brd.reserve()
-
- # that it fails
- with self.assertRaisesRegex(RuntimeError, 'Board reserved by \'bar\'.'):
- await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)
-
- brduser = await self.data.BoardStatus.objects.get(board='cora-1')
- obrdreq = await self.data.BoardStatus.delete(brduser)
-
- # that when the board is reserved by the correct user
- obrdreq = await self.data.BoardStatus.objects.create(
- board='cora-1', user='foo')
-
- echodata = b'somedata'
- wrap_subprocess_exec(cse, stdout=echodata, retcode=0)
-
- client.add_stream_handler(2, mstdout)
- proc = await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)
-
- with self.assertRaises(RuntimeError):
- await client.exec([ 'sshd', '-i' ], stdin=1, stdout=2)
-
- stdin, stdout = proc.stdin, proc.stdout
-
- stdin.write(echodata)
- await stdin.drain()
-
- # that we get our data
- self.assertEqual(await stdout.read(len(echodata)), echodata)
-
- # and that there is no more
- self.assertEqual(await stdout.read(len(echodata)), b'')
-
- # and we are truly at EOF
- self.assertTrue(stdout.at_eof())
-
- stdin.close()
- await stdin.wait_closed()
-
- await proc.wait()
-
- cse.assert_called_with('jexec', 'cora-1', *cmdargs,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
-
- # spin things, not sure best way to handle this
- await asyncio.sleep(.01)
-
- cse.return_value.stdin.close.assert_called_with()
-
- # Per RFC 5737 (https://tools.ietf.org/html/rfc5737):
- # The blocks 192.0.2.0/24 (TEST-NET-1), 198.51.100.0/24 (TEST-NET-2),
- # and 203.0.113.0/24 (TEST-NET-3) are provided for use in
- # documentation.
-
- # Note: this will not work under python before 3.8 before
- # IsolatedAsyncioTestCase was added. The tearDown has to happen
- # with the event loop running, otherwise the task and other things
- # do not get cleaned up properly.
- class TestBiteLab(TestCommon):
- async def asyncSetUp(self):
- await super().asyncSetUp()
-
- self.client = AsyncClient(app=self.app,
- base_url='http://testserver')
-
- async def asyncTearDown(self):
- self.app = None
- await self.client.aclose()
- self.client = None
-
- async def test_basic(self):
- res = await self.client.get('/')
- self.assertNotEqual(res.status_code, HTTP_404_NOT_FOUND)
-
- async def test_notauth(self):
- # test that simple accesses are denied
- res = await self.client.get('/board/classes')
- self.assertEqual(res.status_code, HTTP_401_UNAUTHORIZED)
-
- res = await self.client.get('/board/')
- self.assertEqual(res.status_code, HTTP_401_UNAUTHORIZED)
-
- # test that invalid api keys are denied
- res = await self.client.get('/board/classes',
- auth=BiteAuth('badapikey'))
- self.assertEqual(res.status_code, HTTP_401_UNAUTHORIZED)
-
- async def test_classes(self):
- # that when requesting the board classes
- res = await self.client.get('/board/classes',
- auth=BiteAuth('thisisanapikey'))
-
- # it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- self.assertEqual(res.json(), { 'cora-z7s': BoardClassInfo(**{
- 'arch': 'arm-armv7', 'clsname': 'cora-z7s', }) })
-
- @patch('bitelab.BoardImpl.deactivate')
- @patch('asyncio.create_subprocess_exec')
- @patch('bitelab.snmp.snmpget')
- @patch('logging.error')
- async def test_board_release_script_fail(self, le, sg, cse, bideact):
- # that when snmpget returns False
- sg.return_value = False
-
- # that when the setup script will fail
- wrap_subprocess_exec(cse, stderr=b'error', retcode=1)
-
- # that the cora-1 board is reserved
- data = self.data
- brd = self.brdmgr.boards['cora-1']
- attrs = dict(iface='a', ip='b', devfsrule='c')
- async with brd.lock:
- await brd.reserve()
- obrdreq = await data.BoardStatus.objects.create(
- board='cora-1', user='foo')
- brd.attrcache.update(attrs)
-
- # that when the correct user releases the board
- res = await self.client.post('/board/cora-1/release',
- auth=BiteAuth('thisisanapikey'))
-
- # it fails
- self.assertEqual(res.status_code, HTTP_500_INTERNAL_SERVER_ERROR)
-
- # and returns the correct data
- info = Error(error='Failed to release board, ret: 1, stderr: b\'error\'',
- board=Board(name='cora-1',
- brdclass='cora-z7s',
- reserved=True,
- attrs=attrs,
- ),
- ).dict()
- self.assertEqual(res.json(), info)
-
- # and that it called the release script
- env = os.environ.copy()
- env.update(attrs)
- cse.assert_called_with(self.settings.setup_script,
- 'release', 'cora-1', 'foo', env=env,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- # and that the error got logged
- le.assert_called_with('release script failure: board: \'cora-1\', ret: 1, stderr: b\'error\'')
-
- @patch('bitelab.log_event')
- @patch('bitelab.BoardImpl.deactivate')
- @patch('bitelab.BoardImpl.activate')
- @patch('asyncio.create_subprocess_exec')
- @patch('bitelab.snmp.snmpget')
- async def test_board_reserve_release(self, sg, cse, biact, bideact, le):
- # that when releasing a board that is not yet reserved
- res = await self.client.post('/board/cora-1/release',
- auth=BiteAuth('anotherlongapikey'))
-
- # that it returns an error
- self.assertEqual(res.status_code, HTTP_400_BAD_REQUEST)
-
- # that when snmpget returns False
- sg.return_value = False
-
- # that when the setup script will fail
- wrap_subprocess_exec(cse, stderr=b'error', retcode=1)
-
- # that reserving the board
- res = await self.client.post('/board/cora-1/reserve',
- auth=BiteAuth('thisisanapikey'))
-
- # that it is a failure
- self.assertEqual(res.status_code, HTTP_500_INTERNAL_SERVER_ERROR)
-
- # and returns the correct data
- info = Error(error='Failed to init board, ret: 1, stderr: b\'error\'',
- board=Board(name='cora-1',
- brdclass='cora-z7s',
- reserved=False,
- ),
- ).dict()
- self.assertEqual(res.json(), info)
-
- # and that it called the start script
- cse.assert_called_with(self.settings.setup_script, 'reserve',
- 'cora-1', 'foo', stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- # that when the setup script returns
- wrap_subprocess_exec(cse,
- json.dumps(dict(ip='192.0.2.10',
- iface='epair0b',
- devfsrule='14',
- devfspath='devpath',
- )).encode('utf-8'))
-
- keydata = 'pubsshkey'
-
- # that reserving the board
- res = await self.client.post('/board/cora-1/reserve',
- json=dict(sshpubkey=keydata),
- auth=BiteAuth('thisisanapikey'))
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- brdinfo = Board(name='cora-1',
- brdclass='cora-z7s',
- reserved=True,
- attrs=dict(power=False,
- ip='192.0.2.10',
- iface='epair0b',
- devfsrule='14',
- devfspath='devpath',
- ),
- ).dict()
- self.assertEqual(res.json(), brdinfo)
-
- # and that it called the start script
- cse.assert_called_with(self.settings.setup_script, 'reserve',
- 'cora-1', 'foo', 'pubsshkey', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- # and that the board was activated
- biact.assert_called()
-
- # and that log_event was called properly
- le.assert_called_with('reserve', user='foo',
- board=self.brdmgr.boards['cora-1'])
-
- # that another user reserving the board
- res = await self.client.post('/board/cora-1/reserve',
- auth=BiteAuth('anotherlongapikey'))
-
- # that the request is fails with a conflict
- self.assertEqual(res.status_code, HTTP_409_CONFLICT)
-
- # and returns the correct data
- info = {
- 'error': 'Board currently reserved.',
- 'board': brdinfo,
- }
- self.assertEqual(res.json(), info)
-
- # that another user releases the board
- res = await self.client.post('/board/cora-1/release',
- auth=BiteAuth('anotherlongapikey'))
-
- # that it is denied
- self.assertEqual(res.status_code, HTTP_403_FORBIDDEN)
-
- # and returns the correct data
- info = {
- 'error': 'Board reserved by \'foo\'.',
- 'board': brdinfo,
- }
- self.assertEqual(res.json(), info)
-
- # that when the correct user releases the board
- res = await self.client.post('/board/cora-1/release',
- auth=BiteAuth('thisisanapikey'))
-
- # it is allowed
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- info = {
- 'name': 'cora-1',
- 'brdclass': 'cora-z7s',
- 'reserved': False,
- 'attrs': { 'power': False },
- }
- self.assertEqual(res.json(), info)
-
- # and that log_event was called properly
- le.assert_called_with('release', user='foo',
- board=self.brdmgr.boards['cora-1'])
-
- env = os.environ.copy()
- env['ip'] = brdinfo['attrs']['ip']
- env['iface'] = brdinfo['attrs']['iface']
- env['devfsrule'] = brdinfo['attrs']['devfsrule']
- env['devfspath'] = brdinfo['attrs']['devfspath']
-
- # and that it called the release script
- cse.assert_called_with(self.settings.setup_script, 'release',
- 'cora-1', 'foo', env=env,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- # and deactivated attributes
- bideact.assert_called()
-
- # that it can be reserved by a different user
- res = await self.client.post('/board/cora-1/reserve',
- auth=BiteAuth('anotherlongapikey'))
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- @patch('bitelab.snmp.snmpget')
- async def test_board_info(self, sg):
- # that when snmpget returns False
- sg.return_value = False
-
- # that getting the board info
- res = await self.client.get('/board/',
- auth=BiteAuth('thisisanapikey'))
-
- # calls snmpget w/ the correct args
- sg.assert_called_with('poe', 'pethPsePortAdminEnable.1.2',
- 'bool')
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- info = {
- 'cora-1': {
- 'name': 'cora-1',
- 'brdclass': 'cora-z7s',
- 'reserved': False,
- 'attrs': { 'power': False },
- },
- }
- self.assertEqual(res.json(), info)
-
- # that when snmpget returns True
- sg.return_value = True
-
- # that getting the board info
- res = await self.client.get('/board/cora-1',
- auth=BiteAuth('thisisanapikey'))
-
- # calls snmpget w/ the correct args
- sg.assert_called_with('poe', 'pethPsePortAdminEnable.1.2',
- 'bool')
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- info = {
- 'name': 'cora-1',
- 'brdclass': 'cora-z7s',
- 'reserved': False,
- 'attrs': { 'power': True },
- }
- self.assertEqual(res.json(), info)
-
- @patch('bitelab.snmp.snmpset')
- async def test_board_attrs(self, ss):
- data = self.data
-
- # that when snmpset returns False
- ss.return_value = False
-
- attrs = dict(power=False)
-
- # that setting the board attributes requires auth
- res = await self.client.post('/board/cora-1/attrs',
- auth=BiteAuth('badapi'),
- json=attrs)
-
- # that it fails auth
- self.assertEqual(res.status_code, HTTP_401_UNAUTHORIZED)
-
- # that when properly authorized, but board is not reserved
- res = await self.client.post('/board/cora-1/attrs',
- auth=BiteAuth('thisisanapikey'),
- json=attrs)
-
- # that it is a bad request
- self.assertEqual(res.status_code, HTTP_400_BAD_REQUEST)
-
- # that the cora-1 board is reserved
- brd = self.brdmgr.boards['cora-1']
- async with brd.lock:
- await brd.reserve()
- obrdreq = await data.BoardStatus.objects.create(
- board='cora-1', user='foo')
-
- # that setting the board attributes
- res = await self.client.post('/board/cora-1/attrs',
- auth=BiteAuth('thisisanapikey'),
- json=attrs)
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # calls snmpset w/ the correct args
- ss.assert_called_with('poe', 'pethPsePortAdminEnable.1.2',
- 'bool', False)
-
- # and returns the correct data
- info = {
- 'name': 'cora-1',
- 'brdclass': 'cora-z7s',
- 'reserved': True,
- 'attrs': { 'power': False },
- }
- self.assertEqual(res.json(), info)
-
- # that when snmpset returns True
- ss.return_value = True
-
- attrs = dict(power=True)
-
- # that setting the board attributes
- res = await self.client.post('/board/cora-1/attrs',
- auth=BiteAuth('thisisanapikey'),
- json=attrs)
-
- # calls snmpget w/ the correct args
- ss.assert_called_with('poe', 'pethPsePortAdminEnable.1.2',
- 'bool', True)
-
- # that it is successful
- self.assertEqual(res.status_code, HTTP_200_OK)
-
- # and returns the correct data
- info = {
- 'name': 'cora-1',
- 'brdclass': 'cora-z7s',
- 'reserved': True,
- 'attrs': { 'power': True },
- }
- self.assertEqual(res.json(), info)
-
- class TestBoardImpl(unittest.IsolatedAsyncioTestCase):
- async def test_activate(self):
- # that a board impl
- opttup = create_autospec, dict(spec=Attribute)
- brd = BoardImpl('foo', 'bar', [ opttup ])
- (opt,) = tuple(brd.attrmap.values())
-
- async with brd.lock:
- await brd.reserve()
- await brd.activate()
-
- opt.activate.assert_called_with(brd)
-
- async def test_deactivate(self):
- # that a board impl
- opttup = create_autospec, dict(spec=Attribute)
- brd = BoardImpl('foo', 'bar', [ opttup ])
- (opt,) = tuple(brd.attrmap.values())
-
- async with brd.lock:
- await brd.reserve()
- await brd.deactivate()
-
- opt.deactivate.assert_called_with(brd)
-
- class TestLogEvent(unittest.IsolatedAsyncioTestCase):
- @patch('time.time')
- @patch('logging.info')
- async def test_log_event(self, li, tt):
- tag = 'eslkjdf'
- user = 'weoijsdfkj'
- brdname = 'woied'
- extra = dict(something=2323, someelse='asdlfkj')
- brd = BoardImpl(brdname, {}, [])
-
- tt.return_value = 1607650392.384
-
- await log_event(tag, user=user, board=brd, extra=extra)
-
- res = dict(event=tag, board_name=brdname, user=user,
- date='2020-12-11T01:33:12.384Z', **extra)
-
- # that log_event logs the correct data
- self.assertEqual(len(li.call_args[0]), 1)
-
- # that the logged data can be parsed as json, and results
- # in the correct object
- self.assertEqual(json.loads(li.call_args[0][0]), res)
-
- tt.return_value = 1607650393.289
-
- # that log_event handles no board/user
- await log_event(tag)
-
- res = json.dumps(dict(event=tag,
- date='2020-12-11T01:33:13.289Z'))
-
- li.assert_called_with(res)
-
- # that log_event doesn't allow board/user from extra
- await log_event(tag, extra=dict(board_name='sldkfj',
- user='sod'))
-
- res = json.dumps(dict(event=tag,
- date='2020-12-11T01:33:13.289Z'))
-
- li.assert_called_with(res)
-
- class TestAttrs(unittest.IsolatedAsyncioTestCase):
- @patch('asyncio.create_subprocess_exec')
- async def test_serialconsole(self, cse):
- data = 'somepath'
-
- sctup = (SerialConsole, dict(val=data))
-
- brd = BoardImpl('foo', 'bar', [ sctup ])
- sc = brd.attrmap['console']
-
- self.assertEqual(sc.defattrname, 'console')
-
- self.assertEqual(data, await sc.getvalue())
-
- with self.assertRaises(TypeError):
- await sc.setvalue(data)
-
- devfspath = 'eifd'
-
- brd.add_info(dict(devfspath=devfspath))
-
- wrap_subprocess_exec(cse, retcode=0)
-
- await sc.activate(brd)
-
- cse.assert_any_call('devfs', '-m', devfspath, 'rule',
- 'apply', 'path', os.path.basename(await sc.getvalue()),
- 'unhide',
- stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- cse.assert_any_call('devfs', '-m', devfspath, 'rule',
- 'apply', 'path',
- os.path.basename(await sc.getvalue()) + '.*', 'unhide',
- stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- wrap_subprocess_exec(cse, retcode=1)
- with self.assertRaises(RuntimeError):
- await sc.activate(brd)
-
- @patch('asyncio.create_subprocess_exec')
- async def test_etheriface(self, cse):
- eiface = 'aneiface'
-
- eitup = EtherIface, dict(val=eiface)
-
- brd = BoardImpl('foo', 'bar', [ eitup ])
-
- ei = brd.attrmap['eiface']
-
- self.assertEqual(ei.defattrname, 'eiface')
-
- self.assertEqual(eiface, await ei.getvalue())
-
- with self.assertRaises(TypeError):
- await ei.setvalue('randomdata')
-
- wrap_subprocess_exec(cse, retcode=0)
-
- await ei.activate(brd)
-
- cse.assert_called_with('ifconfig', eiface, 'vnet', 'foo',
- stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- wrap_subprocess_exec(cse, retcode=1)
- with self.assertRaises(RuntimeError):
- await ei.activate(brd)
-
- async def test_multipleattrs(self):
- attrs = [ (Power, dict()) ] * 2
-
- # That multiple attributes w/ same name raises ValueError
- with self.assertRaises(ValueError):
- BoardImpl('foo', 'bar', attrs)
-
- # Enough of this code depends upon the event loop using the
- # code in BaseEventLoop wrt scheduling that this is not a
- # terrible test. If this fails, it is likely the selector
- # doesn't use a sane event loop.
- @patch('asyncio.BaseEventLoop.time')
- @patch('time.time')
- async def test_looptoutc(self, ttime, beltime):
- loop = asyncio.get_running_loop()
-
- utctime = 19239
- belsrctime = 892934
- ttime.return_value = utctime
- beltime.return_value = belsrctime
-
- # that when given the current loop time, that
- # it returns the current utc time
- self.assertEqual(looptoutc(belsrctime), utctime)
-
- # then when an offset is applied
- import random
- offset = random.random() * 1000000
- offset = .000999 * 100000
-
- # the utc has the same offset
- # it'd be nice if this was exact, but it's not because
- # floating point. 9 places gets us nanosecond precision
- self.assertAlmostEqual(looptoutc(belsrctime + offset), utctime + offset, places=9)
-
- # make sure w/ the new code, it round trips
- sometime = 238974.34
- self.assertAlmostEqual(utctoloop(looptoutc(sometime)), sometime)
- self.assertAlmostEqual(looptoutc(utctoloop(sometime)), sometime)
-
- @timeout(2)
- @patch('asyncio.BaseEventLoop.time')
- @patch('time.time')
- async def test_timeout_vals(self, ttime, belt):
- # that a TimeOut with args
- totup = TimeOut, dict(val=10)
-
- # passed to a board w/ the totup
- brd = BoardImpl('foo', 'bar', [ totup ])
-
- to = brd.attrmap['timeout']
-
- with self.assertRaises(RuntimeError):
- # that setting the value when not activate errors
- await to.setvalue(234987)
-
- # that an update will populate the attrs.
- await brd.update()
-
- # and that the board attrs will be present
- # and contain the current timeout
- self.assertEqual(brd.attrs, dict(timeout=10))
-
- # that a given loop time
- looptime = 100.384
- belt.return_value = 100.384
-
- # and a given UTC time (hu Dec 10 14:06:35 UTC 2020)
- utctime = 1607609195.28
- ttime.return_value = utctime
-
- # that when reserved/activated
- async with brd.lock:
- await brd.reserve()
- await brd.activate()
-
- await brd.update()
-
- # That it returns timeout seconds in the future.
- self.assertEqual(brd.attrs, dict(timeout='2020-12-10T14:06:45.280Z'))
-
- with self.assertRaises(ValueError):
- # that setting it to a value farther into
- # the future fails
- await to.setvalue('2020-12-10T14:06:55.280Z')
-
- with self.assertRaises(ValueError):
- # that passing a non-Z ending (not UTC) date fails
- await to.setvalue('2020-12-10T14:06:55.28')
-
- # that setting it to a time slightly earlier
- await to.setvalue('2020-12-10T14:06:44.280Z')
-
- await brd.update()
-
- # That it returns that time
- self.assertEqual(brd.attrs, dict(timeout='2020-12-10T14:06:44.280Z'))
-
- @timeout(2)
- async def test_timeout(self):
- # that a TimeOut with args
- totup = TimeOut, dict(val=.01)
-
- # passed to a board w/ the totup
- brd = BoardImpl('foo', 'bar', [ totup ])
-
- to = brd.attrmap['timeout']
-
- # that when reserved/activated
- async with brd.lock:
- await brd.reserve()
- await brd.activate()
-
- evt = asyncio.Event()
- loop = asyncio.get_running_loop()
- loop.call_at(to._exp + epsilon, evt.set)
- await evt.wait()
-
- # that the board is no longer reserved
- self.assertFalse(brd.reserved)
-
- # that when reserved/activated/deactivated/released
- async with brd.lock:
- await brd.reserve()
- await brd.activate()
- exp = to._exp
- await brd.deactivate()
- await brd.release()
-
- # that the expiration is no longer there
- self.assertIsNone(to._exp)
- print('z')
-
- # and the timeout passes
- evt = asyncio.Event()
- loop = asyncio.get_running_loop()
- loop.call_at(exp + epsilon, evt.set)
- await evt.wait()
-
- print('a')
- # that when reserved/activated
- async with brd.lock:
- await brd.reserve()
- await brd.activate()
-
- print('b')
- # but the board is locked for some reason
- await brd.lock.acquire()
-
- print('c')
- # and the callback is called
- await asyncio.sleep(.02)
-
- print('d')
- # that the task has been scheduled
- self.assertIsNotNone(to._task)
-
- print('e')
- # that it can be deactivated
- await brd.deactivate()
-
- print('f')
- # and when the board lock is released
- brd.lock.release()
-
- print('g')
- # that the board was not released
- self.assertTrue(brd.reserved)
|