From fd2766c05afbad78524c94af1c3fc65c2953892b Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Sun, 19 Jan 2020 23:21:46 -0800 Subject: [PATCH] add start of daemon to push data to the client --- setup.py | 1 + solardash/__init__.py | 154 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+) diff --git a/setup.py b/setup.py index 9624af6..48b5eb2 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ setup(name='solardash', url='https://www.funkthat.com/gitea/jmg/solardash', packages=[ 'solardash', ], install_requires=[ + 'aiohttp', 'RainEagle @ git+https://www.funkthat.com/gitea/jmg/RainEagle.git', ], extras_require = { diff --git a/solardash/__init__.py b/solardash/__init__.py index e69de29..7435c7f 100644 --- a/solardash/__init__.py +++ b/solardash/__init__.py @@ -0,0 +1,154 @@ +# Copyright 2020 John-Mark Gurney. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# + +import aiohttp +import asyncio +import concurrent +import os +import shutil +import tempfile +import unittest + +from aiohttp import web +from RainEagle.parse import LogDir as RELogDir, _cmaiter + +class SolarDataWS(object): + def __init__(self, reprefix): + self._raineagle = RELogDir(reprefix) + + async def websocket_handler(self, request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + async def injector(): + async with _cmaiter(self._raineagle.enditer()) as ei: + while True: + try: + res = await ei.__anext__() + await ws.send_str('ng %.4f' % res.load) + except concurrent.futures._base.CancelledError: + return + + injecttask = asyncio.create_task(injector()) + + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT and msg.data == 'q': + break + else: + print('unknown msg:', repr(msg)) + + injecttask.cancel() + await injecttask + await ws.close() + + return ws + + def get_routes(self): + return [ + web.get('/ws', self.websocket_handler), + ] + +# XXX - how to configure this properly +sdws = SolarDataWS('./raineagle.') + +app = web.Application() +app.add_routes(sdws.get_routes()) + +# https://stackoverflow.com/questions/23033939/how-to-test-python-3-4-asyncio-code +# Slightly modified to timeout and to print trace back when canceled. +# This makes it easier to figure out what "froze". +def async_test(f): + def wrapper(*args, **kwargs): + async def tbcapture(): + try: + return await f(*args, **kwargs) + except asyncio.CancelledError as e: # pragma: no cover + # if we are going to be cancelled, print out a tb + import traceback + traceback.print_exc() + raise + + loop = asyncio.get_event_loop() + + # timeout after 4 seconds + loop.run_until_complete(asyncio.wait_for(tbcapture(), 4)) + + return wrapper + +class Test(unittest.TestCase): + def setUp(self): + # setup temporary directory + d = os.path.realpath(tempfile.mkdtemp()) + self.basetempdir = d + self.tempdir = os.path.join(d, 'subdir') + os.mkdir(self.tempdir) + + os.chdir(self.tempdir) + sdws = SolarDataWS(os.path.join(self.tempdir, 'raineagle')) + + app = web.Application() + app.add_routes(sdws.get_routes()) + self._app = app + + def tearDown(self): + #print('td:', time.time()) + shutil.rmtree(self.basetempdir) + self.tempdir = None + + @async_test + async def test_daemon(self): + loop = asyncio.get_event_loop() + + # launch the server + webport = 58323 + runner = web.AppRunner(self._app) + await runner.setup() + site = web.TCPSite(runner, 'localhost', webport) + await site.start() + + with open(os.path.join(self.tempdir, 'raineagle.0.log'), 'w', buffering=1) as fp: + fp.write('\t'.join([ 'm', '1578879268.23', '1578850464', 'Connected', '1.2740', '3.992000', 'kW', '85.575', '8.092', 'kWh' ]) + '\n') + loop.call_later(.02, fp.write, '\t'.join([ 'm', '1578879269.23', '1578850465', 'Connected', '1.0000', '3.992000', 'kW', '85.575', '8.092', 'kWh' ]) + '\n') + + # Test the websocket + r = [] + try: + async with aiohttp.ClientSession() as session, \ + session.ws_connect('http://localhost:%d/ws' % + webport) as ws: + async for msg in ws: + r.append(msg) + if len(r) == 2: + break + + await ws.send_str('q') + + self.assertEqual(r[0].type, aiohttp.WSMsgType.TEXT) + self.assertEqual(r[0].data, 'ng 1.2740') + + self.assertEqual(r[1].type, aiohttp.WSMsgType.TEXT) + self.assertEqual(r[1].data, 'ng 1.0000') + finally: + await runner.cleanup()