|
|
@@ -1,21 +1,48 @@ |
|
|
|
import collections |
|
|
|
import itertools |
|
|
|
import pathlib |
|
|
|
import shutil |
|
|
|
import tempfile |
|
|
|
import unittest |
|
|
|
from .utils import _asn1coder |
|
|
|
|
|
|
|
# Use standard decay formula, from: |
|
|
|
# https://en.wikipedia.org/wiki/Radioactive_decay#One-decay_process |
|
|
|
# |
|
|
|
# N(t) = N0 e^(-λt) |
|
|
|
# |
|
|
|
# And from earlier, they are defined as: |
|
|
|
# t1/2 (half-life) = ln(2) / λ |
|
|
|
# λ = ln(2) / t1/2 |
|
|
|
|
|
|
|
class TagCache: |
|
|
|
'''Takes tuples, and stores them in a cache where only |
|
|
|
the most count recent ones are kept.''' |
|
|
|
the most count recent and active ones are kept. |
|
|
|
|
|
|
|
count is the number of tags returned by the method tags. |
|
|
|
|
|
|
|
tags is the initial starting state of the cache. |
|
|
|
|
|
|
|
Current use is to create a starter object as such: |
|
|
|
cache = TagCache(), and modify count as needed: |
|
|
|
cache.count = 10 |
|
|
|
|
|
|
|
Use the store and load methods to write and read the |
|
|
|
cache to a specific file. |
|
|
|
''' |
|
|
|
|
|
|
|
def __init__(self, tags=(), count=10): |
|
|
|
self._cache = dict((x, None) for x in tags) |
|
|
|
def __init__(self, tags=(), count=10, **kwargs): |
|
|
|
self._cache = collections.OrderedDict((x, None) for x in tags) |
|
|
|
self._count = count |
|
|
|
self._modified = False |
|
|
|
if 'limit' in kwargs: |
|
|
|
self._limit = kwargs['limit'] |
|
|
|
else: |
|
|
|
self._limit = self._count |
|
|
|
|
|
|
|
def _limit_count(self): |
|
|
|
while len(self._cache) > self._count: |
|
|
|
del self._cache[next(iter(self._cache.keys()))] |
|
|
|
while len(self._cache) > self._limit: |
|
|
|
self._cache.popitem(last=False) |
|
|
|
|
|
|
|
@property |
|
|
|
def count(self): |
|
|
@@ -26,10 +53,23 @@ class TagCache: |
|
|
|
@count.setter |
|
|
|
def count(self, v): |
|
|
|
self._count = v |
|
|
|
if v > self._limit: |
|
|
|
self._limit = v |
|
|
|
|
|
|
|
self._modified = True |
|
|
|
|
|
|
|
self._limit_count() |
|
|
|
|
|
|
|
@property |
|
|
|
def limit(self): |
|
|
|
'''The number of total entries allowed in the cache. |
|
|
|
|
|
|
|
This is the number of stored entires, not the count |
|
|
|
returned by the tags method. |
|
|
|
''' |
|
|
|
|
|
|
|
return self._limit |
|
|
|
|
|
|
|
@property |
|
|
|
def modified(self): |
|
|
|
'''Return if the cache has been modified since the last |
|
|
@@ -55,7 +95,12 @@ class TagCache: |
|
|
|
def tags(self): |
|
|
|
'''Returns the sorted list of tags in the cache.''' |
|
|
|
|
|
|
|
return sorted(self._cache.keys()) |
|
|
|
return sorted(itertools.islice(self._cache.keys(), |
|
|
|
len(self._cache) - self._count, len(self._cache))) |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
return 'TagCache(tags=%s, count=%d, limit=%d)' % \ |
|
|
|
(tuple(self._cache.keys()), self.count, self._limit) |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def load(cls, fname): |
|
|
@@ -98,6 +143,80 @@ class _TestTagCache(unittest.TestCase): |
|
|
|
shutil.rmtree(self.basetempdir) |
|
|
|
self.tempdir = None |
|
|
|
|
|
|
|
@unittest.mock.patch('time.time', side_effect=lambda cnt= |
|
|
|
itertools.count(): cnt.next() * 1.) |
|
|
|
def test_halflife(self, tt): |
|
|
|
pass |
|
|
|
|
|
|
|
def test_limit(self): |
|
|
|
tc = TagCache(count=2, limit=3) |
|
|
|
|
|
|
|
tc.add(('foo', 'foo')) |
|
|
|
|
|
|
|
tc.add(('bar', 'bar')) |
|
|
|
|
|
|
|
tc.add(('foo', 'foo')) |
|
|
|
|
|
|
|
tc.add(('baz', 'baz')) |
|
|
|
|
|
|
|
self.assertEqual(tc.tags(), [ ('baz', 'baz'), ('foo', 'foo'), ]) |
|
|
|
|
|
|
|
# increasing the count |
|
|
|
tc.count = 3 |
|
|
|
|
|
|
|
# causes the limit to increase |
|
|
|
self.assertEqual(tc.limit, 3) |
|
|
|
|
|
|
|
# and the old one to appear |
|
|
|
self.assertEqual(tc.tags(), [ ('bar', 'bar'), ('baz', 'baz'), |
|
|
|
('foo', 'foo'), ]) |
|
|
|
|
|
|
|
# decreasing the count |
|
|
|
tc.count = 2 |
|
|
|
|
|
|
|
# that it can be stored |
|
|
|
cachefile = self.tempdir / 'somecache' |
|
|
|
tc.store(cachefile) |
|
|
|
|
|
|
|
# that it can be loaded |
|
|
|
ntc = TagCache.load(cachefile) |
|
|
|
|
|
|
|
def test_count(self): |
|
|
|
# test basic functionality |
|
|
|
tc = TagCache(count=2) |
|
|
|
|
|
|
|
tc.add(('foo', 'foo')) |
|
|
|
|
|
|
|
tc.add(('bar', 'bar')) |
|
|
|
|
|
|
|
tc.add(('baz', 'baz')) |
|
|
|
|
|
|
|
self.assertEqual(tc.tags(), [ ('bar', 'bar'), ('baz', 'baz') ]) |
|
|
|
|
|
|
|
# that the count can be modified |
|
|
|
tc.count = 3 |
|
|
|
|
|
|
|
# that modified flag is set |
|
|
|
self.assertTrue(tc.modified) |
|
|
|
|
|
|
|
#import pdb; pdb.set_trace() |
|
|
|
tc.add(('foo', 'foo')) |
|
|
|
|
|
|
|
self.assertEqual(tc.tags(), [ ('bar', 'bar'), ('baz', 'baz'), |
|
|
|
('foo', 'foo') ]) |
|
|
|
|
|
|
|
tc.add(('bleh', 'bleh')) |
|
|
|
|
|
|
|
self.assertEqual(tc.tags(), [ ('baz', 'baz'), ('bleh', 'bleh'), |
|
|
|
('foo', 'foo') ]) |
|
|
|
|
|
|
|
# that reducing the count works |
|
|
|
tc.count = 2 |
|
|
|
|
|
|
|
# and immediately gets rid of extra tags |
|
|
|
self.assertEqual(tc.tags(), [ ('bleh', 'bleh'), |
|
|
|
('foo', 'foo') ]) |
|
|
|
|
|
|
|
def test_cache(self): |
|
|
|
# test basic functionality |
|
|
|
tc = TagCache(count=2) |
|
|
@@ -138,31 +257,5 @@ class _TestTagCache(unittest.TestCase): |
|
|
|
|
|
|
|
ntc.add(('whee', 'whee')) |
|
|
|
|
|
|
|
self.assertEqual(ntc.tags(), [ ('foo', 'foo'), ('whee', 'whee') ]) |
|
|
|
|
|
|
|
# that when the modified flag is cleared |
|
|
|
ntc.store(cachefile) |
|
|
|
ntc = TagCache.load(cachefile) |
|
|
|
|
|
|
|
# that the count can be modified |
|
|
|
ntc.count = 3 |
|
|
|
|
|
|
|
# that modified flag is set after count change |
|
|
|
self.assertTrue(ntc.modified) |
|
|
|
|
|
|
|
ntc.add(('a', 'a')) |
|
|
|
|
|
|
|
ntc.add(('b', 'b')) |
|
|
|
|
|
|
|
# and the count did change |
|
|
|
self.assertEqual(ntc.tags(), [ ('a', 'a'), ('b', 'b'), ('whee', 'whee') ]) |
|
|
|
|
|
|
|
ntc.store(cachefile) |
|
|
|
|
|
|
|
ntc.add(('whee', 'whee')) |
|
|
|
|
|
|
|
# that reducing the count works |
|
|
|
ntc.count = 2 |
|
|
|
|
|
|
|
# and immediately gets rid of extra tags |
|
|
|
self.assertEqual(ntc.tags(), [ ('b', 'b'), ('whee', 'whee') ]) |
|
|
|
self.assertEqual(ntc.tags(), [ ('foo', 'foo'), |
|
|
|
('whee', 'whee') ]) |