Source code for magento.batches

from typing import List, Callable, Iterable, TypeVar, Generic

from .client import Magento
from .queries import make_field_value_query
from .types import MagentoEntity

BATCH_SIZE = 500


[docs]class BatchSaver: """ Base class to create context managers for asynchronous batches. """ def __init__(self, client: Magento, api_path: str, batch_size=BATCH_SIZE): self.client = client self.path = api_path self.batch_size = batch_size self._batch: List[MagentoEntity] = [] # some stats self._sent_batches = 0 self._sent_items = 0
[docs] def add_item(self, item_data): """ Add an item to the current batch. If it makes the batch large enough, it’s sent to the API and a new empty batch is created. """ self._batch.append(item_data) if len(self._batch) >= self.batch_size: self.send_batch()
[docs] def send_batch(self): """ Send the current pending batch (if any). """ if not self._batch: return self._put_batch() self._sent_items += len(self._batch) self._sent_batches += 1 self._batch = []
def _put_batch(self): # pragma: nocover # Note this raises on error _resp = self.client.put_api(self.path, json=self._batch, throw=True, async_bulk=True).json()
[docs] def finalize(self): """ Send the last pending batch (if any). This doesn’t need to be called when the object is used as a context manager. :return: a dictionary with the total number of batches and items """ self.send_batch() return { "sent_batches": self._sent_batches, "sent_items": self._sent_items, }
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.finalize()
[docs]class ProductBatchSaver(BatchSaver): """ Context manager to add products to an asynchronous batch job. >>> with ProductBatchSaver() as p: ... for product_data in ...: ... p.save_product(product_data) """ def __init__(self, client: Magento, batch_size=BATCH_SIZE): super().__init__(client, '/V1/products/bySku', batch_size=batch_size)
[docs] def save_product(self, product_data: dict): self.add_item({"product": product_data})
T = TypeVar('T')
[docs]class BatchGetter(Generic[T]): """ Base class to create generators of Magento items that can be retrieved from the API using queries. This retrieves items in batches but can be iterated on like any iterator. """ def __init__(self, getter: Callable[..., Iterable[T]], key_field: str, keys: Iterable, batch_size=50): self.batch_size = batch_size self.getter = getter self.key_field = key_field self.keys = keys self._batch: List[str] = [] def _get_batch(self) -> Iterable[T]: if not self._batch: return in_ = ",".join(self._batch) q = make_field_value_query(field=self.key_field, value=in_, condition_type="in") yield from self.getter(query=q, limit=len(self._batch)) self._batch = [] def __iter__(self): for key in self.keys: self._batch.append(str(key)) if len(self._batch) < self.batch_size: continue yield from self._get_batch() yield from self._get_batch()
[docs]class ProductBatchGetter(BatchGetter[dict]): """ Get a bunch of products from an iterable of SKUs: >>> products = ProductBatchGetter(Magento(), ["sku1", "sku2", ...]) >>> for product in products: ... print(product) """ def __init__(self, client: Magento, skus: Iterable[str]): super().__init__(client.get_products, "sku", skus)