|
|
@@ -0,0 +1,154 @@ |
|
|
|
# Copyright 2020 John-Mark Gurney. |
|
|
|
# All rights reserved. |
|
|
|
# |
|
|
|
# Redistribution and use in source and binary forms, with or without |
|
|
|
# modification, are permitted provided that the following conditions |
|
|
|
# are met: |
|
|
|
# 1. Redistributions of source code must retain the above copyright |
|
|
|
# notice, this list of conditions and the following disclaimer. |
|
|
|
# 2. Redistributions in binary form must reproduce the above copyright |
|
|
|
# notice, this list of conditions and the following disclaimer in the |
|
|
|
# documentation and/or other materials provided with the distribution. |
|
|
|
# |
|
|
|
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
|
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
|
|
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
|
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
|
|
|
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
|
|
|
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
|
|
|
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
|
|
|
# SUCH DAMAGE. |
|
|
|
# |
|
|
|
|
|
|
|
import aiohttp |
|
|
|
import asyncio |
|
|
|
import concurrent |
|
|
|
import os |
|
|
|
import shutil |
|
|
|
import tempfile |
|
|
|
import unittest |
|
|
|
|
|
|
|
from aiohttp import web |
|
|
|
from RainEagle.parse import LogDir as RELogDir, _cmaiter |
|
|
|
|
|
|
|
class SolarDataWS(object): |
|
|
|
def __init__(self, reprefix): |
|
|
|
self._raineagle = RELogDir(reprefix) |
|
|
|
|
|
|
|
async def websocket_handler(self, request): |
|
|
|
ws = web.WebSocketResponse() |
|
|
|
await ws.prepare(request) |
|
|
|
|
|
|
|
async def injector(): |
|
|
|
async with _cmaiter(self._raineagle.enditer()) as ei: |
|
|
|
while True: |
|
|
|
try: |
|
|
|
res = await ei.__anext__() |
|
|
|
await ws.send_str('ng %.4f' % res.load) |
|
|
|
except concurrent.futures._base.CancelledError: |
|
|
|
return |
|
|
|
|
|
|
|
injecttask = asyncio.create_task(injector()) |
|
|
|
|
|
|
|
async for msg in ws: |
|
|
|
if msg.type == aiohttp.WSMsgType.TEXT and msg.data == 'q': |
|
|
|
break |
|
|
|
else: |
|
|
|
print('unknown msg:', repr(msg)) |
|
|
|
|
|
|
|
injecttask.cancel() |
|
|
|
await injecttask |
|
|
|
await ws.close() |
|
|
|
|
|
|
|
return ws |
|
|
|
|
|
|
|
def get_routes(self): |
|
|
|
return [ |
|
|
|
web.get('/ws', self.websocket_handler), |
|
|
|
] |
|
|
|
|
|
|
|
# XXX - how to configure this properly |
|
|
|
sdws = SolarDataWS('./raineagle.') |
|
|
|
|
|
|
|
app = web.Application() |
|
|
|
app.add_routes(sdws.get_routes()) |
|
|
|
|
|
|
|
# https://stackoverflow.com/questions/23033939/how-to-test-python-3-4-asyncio-code |
|
|
|
# Slightly modified to timeout and to print trace back when canceled. |
|
|
|
# This makes it easier to figure out what "froze". |
|
|
|
def async_test(f): |
|
|
|
def wrapper(*args, **kwargs): |
|
|
|
async def tbcapture(): |
|
|
|
try: |
|
|
|
return await f(*args, **kwargs) |
|
|
|
except asyncio.CancelledError as e: # pragma: no cover |
|
|
|
# if we are going to be cancelled, print out a tb |
|
|
|
import traceback |
|
|
|
traceback.print_exc() |
|
|
|
raise |
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
|
|
# timeout after 4 seconds |
|
|
|
loop.run_until_complete(asyncio.wait_for(tbcapture(), 4)) |
|
|
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
class Test(unittest.TestCase): |
|
|
|
def setUp(self): |
|
|
|
# setup temporary directory |
|
|
|
d = os.path.realpath(tempfile.mkdtemp()) |
|
|
|
self.basetempdir = d |
|
|
|
self.tempdir = os.path.join(d, 'subdir') |
|
|
|
os.mkdir(self.tempdir) |
|
|
|
|
|
|
|
os.chdir(self.tempdir) |
|
|
|
sdws = SolarDataWS(os.path.join(self.tempdir, 'raineagle')) |
|
|
|
|
|
|
|
app = web.Application() |
|
|
|
app.add_routes(sdws.get_routes()) |
|
|
|
self._app = app |
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
#print('td:', time.time()) |
|
|
|
shutil.rmtree(self.basetempdir) |
|
|
|
self.tempdir = None |
|
|
|
|
|
|
|
@async_test |
|
|
|
async def test_daemon(self): |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
|
|
# launch the server |
|
|
|
webport = 58323 |
|
|
|
runner = web.AppRunner(self._app) |
|
|
|
await runner.setup() |
|
|
|
site = web.TCPSite(runner, 'localhost', webport) |
|
|
|
await site.start() |
|
|
|
|
|
|
|
with open(os.path.join(self.tempdir, 'raineagle.0.log'), 'w', buffering=1) as fp: |
|
|
|
fp.write('\t'.join([ 'm', '1578879268.23', '1578850464', 'Connected', '1.2740', '3.992000', 'kW', '85.575', '8.092', 'kWh' ]) + '\n') |
|
|
|
loop.call_later(.02, fp.write, '\t'.join([ 'm', '1578879269.23', '1578850465', 'Connected', '1.0000', '3.992000', 'kW', '85.575', '8.092', 'kWh' ]) + '\n') |
|
|
|
|
|
|
|
# Test the websocket |
|
|
|
r = [] |
|
|
|
try: |
|
|
|
async with aiohttp.ClientSession() as session, \ |
|
|
|
session.ws_connect('http://localhost:%d/ws' % |
|
|
|
webport) as ws: |
|
|
|
async for msg in ws: |
|
|
|
r.append(msg) |
|
|
|
if len(r) == 2: |
|
|
|
break |
|
|
|
|
|
|
|
await ws.send_str('q') |
|
|
|
|
|
|
|
self.assertEqual(r[0].type, aiohttp.WSMsgType.TEXT) |
|
|
|
self.assertEqual(r[0].data, 'ng 1.2740') |
|
|
|
|
|
|
|
self.assertEqual(r[1].type, aiohttp.WSMsgType.TEXT) |
|
|
|
self.assertEqual(r[1].data, 'ng 1.0000') |
|
|
|
finally: |
|
|
|
await runner.cleanup() |