Source code for torequests.frequency_controller.async_tools
from asyncio import Lock, sleep
from time import time
[docs]class AsyncFrequency(object):
"""AsyncFrequency controller, means concurrent running n tasks every interval seconds.
Basic Usage::
from torequests.frequency_controller.async_tools import AsyncFrequency
from asyncio import ensure_future, get_event_loop
from time import time
async def test_async():
frequency = AsyncFrequency(2, 1)
async def task():
async with frequency:
return time()
now = time()
tasks = [ensure_future(task()) for _ in range(5)]
result = [await task for task in tasks]
assert result[0] - now < 1
assert result[1] - now < 1
assert result[2] - now > 1
assert result[3] - now > 1
assert result[4] - now > 2
assert frequency.to_dict() == {'n': 2, 'interval': 1}
assert frequency.to_list() == [2, 1]
get_event_loop().run_until_complete(test_async())
"""
__slots__ = ("gen", "__aenter__", "repr", "_lock", "n", "interval")
TIMER = time
def __init__(self, n=None, interval=0):
self.n = n
self.interval = interval
if n:
self.gen = self.generator(n, interval)
self._lock = None
self.__aenter__ = self._acquire
self.repr = f"AsyncFrequency({n}, {interval})"
else:
self.gen = None
self.__aenter__ = self.__aexit__
self.repr = "AsyncFrequency(unlimited)"
[docs] def to_list(self):
"""Return the [self.n, self.interval]"""
return [self.n, self.interval]
[docs] def to_dict(self):
"""Return the dict {'n': self.n, 'interval': self.interval}"""
return {'n': self.n, 'interval': self.interval}
@property
def lock(self):
# lazy init loop
if self._lock is None:
self._lock = Lock()
return self._lock
[docs] async def generator(self, n, interval):
q = [0] * n
while 1:
for index, i in enumerate(q):
# or timeit.default_timer()
now = self.TIMER()
diff = now - i
if diff < interval:
await sleep(interval - diff)
now = self.TIMER()
q[index] = now
# python3.8+ need lock for generator contest, 3.6 3.7 not need
yield now
[docs] @classmethod
def ensure_frequency(cls, frequency):
"""Ensure the given args is AsyncFrequency.
:param frequency: args to create a AsyncFrequency instance.
:type frequency: AsyncFrequency / dict / list / tuple
:return: AsyncFrequency instance
:rtype: AsyncFrequency
"""
if isinstance(frequency, cls):
return frequency
elif isinstance(frequency, dict):
return cls(**frequency)
else:
return cls(*frequency)
async def _acquire(self):
async with self.lock:
return await self.gen.asend(None)
async def __aexit__(self, *args):
pass
def __str__(self):
return repr(self)
def __repr__(self):
return self.repr
def __bool__(self):
return bool(self.gen)