Browse Source

add function run_on_modify to make usage a bit easier..

main
John-Mark Gurney 4 years ago
parent
commit
99650d3274
2 changed files with 88 additions and 0 deletions
  1. +24
    -0
      README.md
  2. +64
    -0
      aiokq/__init__.py

+ 24
- 0
README.md View File

@@ -14,6 +14,8 @@ Sample Usage

To watch a file for modification:
```
import aiokq

fp = open(fname)
async with aiokq.watch_file(fp) as wf:
while True:
@@ -28,3 +30,25 @@ The with symantics is required in order to address the race where a
write is issued between the registration and the time that you do the
read. There is the possibility that a wakeup happens and there are
no modifications due to this race.

If you have a function that needed to process a file, you can use
the run_on_modify function:
```
import aiokq
import asyncio

async def processfile(fp, state):
# process the file in fp
pass

fp = open(fname)
asyncio.create_task(aiokq.run_on_modify(fp, processfile, []))
```

The function processfile will be called once initially, to process
the file, and then again after each modification. As writes or
modifications may happen in blocks, it is entirely possible that
processfile will be called in the middle of an update. It is up
to the function to use file locking, or another mechanism (append
only) to ensure that the file is in a stable state during processing,
or simply return waiting for the next modification to complete.

+ 64
- 0
aiokq/__init__.py View File

@@ -35,6 +35,8 @@ import unittest
# XXX - this matches on both FreeBSD and Darwin
KQ_EV_RECEIPT = 0x40

__all__ = [ 'watch_file', 'run_on_modify', ]

class AsyncKqueue(object):
def __init__(self):
self._kq = select.kqueue()
@@ -82,15 +84,43 @@ class AsyncKqueue(object):

@contextlib.asynccontextmanager
async def watch_file(self, fp):
'''
A context manager, for watching for modification of a file.

Example usage:
fp = open(fname)
async with aiokq.watch_file(fp) as wf:
while True:
# do something with fp

await wf()
'''
try:
id, waitfun = self.addevent(fp, select.KQ_FILTER_VNODE, select.KQ_NOTE_EXTEND|select.KQ_NOTE_WRITE)
yield waitfun
finally:
self.removeevent(id)

async def run_on_modify(self, fp, fun, *args, **kwargs):
'''
The coroutine fun is run once at first, and then, when ever the
fp is modified, rerun the fun. The first argument to fun will
be the file like object.

Currently, it is just fp, but in the future if fp is a file
name, it is necessary to provide fun a different fp.
'''

async with self.watch_file(fp) as wf:
while True:
await fun(fp, *args, **kwargs)

await wf()

_globalkq = AsyncKqueue()

watch_file = _globalkq.watch_file
run_on_modify = _globalkq.run_on_modify

# https://stackoverflow.com/questions/23033939/how-to-test-python-3-4-asyncio-code
# Slightly modified to timeout and to print trace back when canceled.
@@ -128,6 +158,40 @@ class Tests(unittest.TestCase):
shutil.rmtree(self.basetempdir)
self.tempdir = None

@async_test
async def test_runonmodify(self):
loop = asyncio.get_event_loop()

res = []

sem = asyncio.Semaphore(0)

async def myfun(rfp, *args, **kwargs):
rfp.seek(0)
res.append((rfp, rfp.read(), args, kwargs))
sem.release()

with open('samplefile.txt', 'w+') as fp, open('samplefile.txt', 'r') as rfp:
loop.create_task(run_on_modify(rfp, myfun, 1, foo='bar'))

def fpwrflush(data):
fp.write(data)
fp.flush()

await sem.acquire()
self.assertEqual(res.pop(), (rfp, '', (1,), { 'foo': 'bar' }))

fpwrflush('foo')

await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foo', (1,), { 'foo': 'bar' }))

fpwrflush('foobar')

await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foofoobar', (1,), { 'foo': 'bar' }))


@async_test
async def test_filemod(self):
loop = asyncio.get_event_loop()


Loading…
Cancel
Save