Source code for asyncdex.models.pager
import asyncio
from collections import deque
from math import ceil
from typing import Any, AsyncIterator, Generic, MutableMapping, Optional, TYPE_CHECKING, Type, TypeVar
from .abc import GenericModelList, Model, ModelList
if TYPE_CHECKING:
from ..client import MangadexClient
_ModelT = TypeVar("_ModelT", bound=Model)
[docs]class Pager(AsyncIterator[_ModelT], Generic[_ModelT]):
"""A pager object which automatically paginates responses with an offset and limit combo.
.. versionadded:: 0.3
:param limit_size: The maximum limit for each request. Defaults to ``100``.
:type limit_size: int
"""
url: str
"""The URL to paginate against."""
model: Type[_ModelT]
"""A subclass of :class:`.Model` to transform the results into."""
client: "MangadexClient"
"""The client that is associated with the Pager."""
params: MutableMapping[str, Any]
"""Additional params to include in every request."""
limit: Optional[int]
"""The Pager will only return up to these many items.
.. versionadded:: 0.4
"""
returned: int
"""How many items were returned so far.
.. versionadded:: 0.5
"""
param_size: int
"""How many parameters can be included in a given request.
.. versionadded:: 1.0
"""
def __init__(
self,
url: str,
model: Type[_ModelT],
client: "MangadexClient",
*,
params: Optional[MutableMapping[str, Any]] = None,
param_size: int = 150,
limit_size: int = 100,
limit: Optional[int] = None,
):
self.url = url
self.model = model
self.client = client
self.limit = limit
self.returned = 0
self.params = params or {}
self.params.setdefault("offset", 0)
self.params["limit"] = limit_size
self.param_size = param_size
if self.limit and self.params["limit"] > self.limit:
self.params["limit"] = self.limit
self._queue = deque()
self._reqs = deque()
self._started_parallel = None
# A queue that fills after network requests. This is used to only the return the first of a lot of responses
# on the initial request, and return more items afterwards.
self._done = False
# We want to check the parameters to get the total length in order to distribute resources effectively.
single_params = 0
iterator_params = {}
for key, value in self.params.items():
if not isinstance(value, str) and hasattr(value, "__iter__"):
iterator_params[key] = list(value)
else:
single_params += 1
remaining_params = self.param_size - single_params
leftover = sum(len(item) for item in iterator_params.values())
pagers_needed = ceil(leftover / remaining_params)
if pagers_needed > 1:
largest = sorted(iterator_params.items(), key=lambda i: i[1])[0]
raise ValueError(
"There are more parameters specified than the amount that can be safely handled by the "
f"API.\nLargest parameter: {largest[0]} with {largest[1]} items"
)
[docs] def __aiter__(self) -> AsyncIterator[_ModelT]:
"""Return an async iterator (itself)
:return: The Pager class.
:rtype: Pager
"""
return self
async def _extract_item(self, task: asyncio.Task):
items = await task
for item in items:
self._queue.append(item)
async def _do_request(self, offset=None):
offset = offset or self.params["offset"]
r = await self.client.request("GET", self.url, params={**self.params, "offset": offset}, add_includes=True)
if r.status == 204:
self._done = True
raise StopAsyncIteration
json = await r.json()
r.close()
if not self._started_parallel:
self._started_parallel = True
for item in json["results"]:
if item:
self._queue.append(self.model(self.client, data=item))
if json["total"] <= self.params["offset"] + self.params["limit"]:
self._done = True
else:
self.params["offset"] += self.params["limit"]
limit_left = json["total"]
if self.limit is not None and self.limit > 0:
limit_left = self.limit
extra = int(ceil((limit_left - self.params["limit"]) / self.params["limit"]))
for i in range(extra):
self._reqs.append(
asyncio.create_task(self._do_request(offset=self.params["offset"] + self.params["limit"] * i))
)
self._done = True
else:
return [self.model(self.client, data=item) for item in json["results"]]
[docs] async def __anext__(self) -> _ModelT:
"""Return a model from the queue. If there are no items remaining, a request is made to fetch the next set of
items.
.. versionchanged:: 0.4
This method will no longer hang to complete all requests.
.. versionchanged:: 0.5
This method will fully respect limits even if the API does not.
:return: The new model.
:rtype: Model
"""
if self.limit and self.returned >= self.limit:
raise StopAsyncIteration
if len(self._queue) == 0:
if self._done:
if len(self._reqs) > 0:
await self._extract_item(self._reqs.popleft())
else:
raise StopAsyncIteration
else:
await self._do_request(self.params["offset"])
self.returned += 1
try:
return self._queue.popleft()
except IndexError:
raise StopAsyncIteration
[docs] def __repr__(self) -> str:
"""Provide a string representation of the object.
:return: The string representation
:rtype: str
"""
return f"{type(self).__name__}(url={self.url!r}, offset={self.params['offset']}, limit={self.params['limit']})"
[docs] async def as_list(self) -> ModelList[_ModelT]:
"""Returns all items in the Pager as a list.
.. versionchanged:: 0.5
If :attr:`.model` is :class:`.Manga`, this method will return :class:`.MangaList`. Otherwise, this method
will return a :class:`.GenericModelList`.
:return: A :class:`.ModelList` with the total models.
.. versionchanged:: 0.5
Prior to 0.5, this method returned a normal :class:`list`.
:rtype: ModelList
"""
from .manga import Manga
from .manga_list import MangaList
if issubclass(self.model, Manga):
return MangaList(self.client, entries=[item async for item in self])
else:
return GenericModelList([item async for item in self])