from __future__ import annotations
from functools import cached_property
from typing import Union, TYPE_CHECKING, Optional, List
from . import Model
if TYPE_CHECKING:
from magento import Client
from magento.search import SearchQuery
from . import Category, Order, OrderItem, Invoice
[docs]class Product(Model):
"""Wrapper for the ``products`` endpoint"""
STATUS_ENABLED = 1
STATUS_DISABLED = 2
VISIBILITY_NOT_VISIBLE = 1
VISIBILITY_CATALOG = 2
VISIBILITY_SEARCH = 3
VISIBILITY_BOTH = 4
DOCUMENTATION = 'https://adobe-commerce.redoc.ly/2.3.7-admin/tag/products/'
IDENTIFIER = 'sku'
[docs] def __init__(self, data: dict, client: Client):
"""Initialize a Product object using an API response from the ``products`` endpoint
:param data: the API response from the ``products`` endpoint
:param client: an initialized :class:`~.Client` object
"""
super().__init__(
data=data,
client=client,
endpoint='products',
private_keys=True
)
def __repr__(self):
return f'<Magento Product: {self.sku}>'
@property
def excluded_keys(self):
return ['media_gallery_entries']
@property
def uid(self) -> Union[str, int]:
return self.encoded_sku
[docs] def update_stock(self, qty: int) -> bool:
"""Updates the stock quantity
:param qty: the new stock quantity
"""
url = f'{self.data_endpoint()}/stockItems/{self.stock_item_id}'
payload = {
"stock_item": {
"qty": qty,
"is_in_stock": qty > 0
},
'save_options': True
}
response = self.client.put(url, payload)
if response.ok:
self.refresh()
self.logger.info(f'Updated stock to {self.stock} for {self}')
return True
else:
self.logger.error(
f'Failed to update stock for {self} with status code {response.status_code}' + '\n' +
f'Message: {response.json()}'
)
return False
[docs] def update_status(self, status: int) -> bool:
"""Update the product status
:param status: either 1 (for :attr:`~.STATUS_ENABLED`) or 2 (for :attr:`~.STATUS_DISABLED`)
"""
if status not in [Product.STATUS_ENABLED, Product.STATUS_DISABLED]:
raise ValueError('Invalid status provided')
return self.update_attributes({'status': status})
[docs] def update_price(self, price: Union[int, float]) -> bool:
"""Update the product price
:param price: the new price
"""
return self.update_attributes({'price': price})
[docs] def update_special_price(self, price: Union[float, int]) -> bool:
"""Update the product special price
:param price: the new special price
"""
if price < self.price:
return self.update_custom_attributes({'special_price': price})
self.logger.error(f'Sale price for {self} must be less than current price ({self.price})')
return False
[docs] def update_name(self, name: str, scope: Optional[str] = None) -> bool:
"""Update the product name
:param name: the new name to use
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
return self.update_attributes({'name': name}, scope)
[docs] def update_description(self, description: str, scope: Optional[str] = None) -> bool:
"""Update the product description
:param description: the new HTML description to use
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
return self.update_custom_attributes({'description': description}, scope)
[docs] def update_attributes(self, attribute_data: dict, scope: Optional[str] = None) -> bool:
"""Update top level product attributes with scoping taken into account
.. note:: Product attributes can have a ``Global``, ``Store View`` or ``Website`` scope
:Global Attributes:
Values are updated on all store views and the admin
:Website Attributes:
Values are updated on all store views
:Store View Attributes:
Values are updated on the store view specified in the request ``scope``
A second request will be made to update ``Store View`` and ``Website`` attributes on the admin,
depending on how many :class:`~.Store` :attr:`~.views` you have:
* **1 View:** admin values are updated for all attributes, regardless of scope
* **2+ Views:** admin values are updated only for :attr:`~.website_product_attributes`
:param attribute_data: a dictionary of product attributes to update
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if self.client.store.is_single_store:
return self._update_single_store(attribute_data)
if not self._update_attributes(attribute_data, scope):
return False
if website_attrs := self.client.store.filter_website_attrs(attribute_data):
return self._update_attributes(website_attrs, scope='all')
return True
[docs] def update_custom_attributes(self, attribute_data: dict, scope: Optional[str] = None) -> bool:
"""Update custom attributes with scoping taken into account
See :meth:`~update_attributes` for details
.. admonition:: Important
:class: important-af
This method only supports updating **custom attributes**
:param attribute_data: a dictionary of custom attributes to update
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
attributes = {'custom_attributes': self.pack_attributes(attribute_data)}
if self.client.store.is_single_store:
return self._update_single_store(attributes)
if not self._update_attributes(attributes, scope):
return False
if website_attributes := self.client.store.filter_website_attrs(attribute_data):
return self._update_attributes({'custom_attributes': self.pack_attributes(website_attributes)}, scope='all')
return True
def _update_single_store(self, attribute_data: dict) -> bool:
"""Internal function for updating a store with a single store view
All attributes will be updated on the ``default`` and ``all`` scope,
ensuring that the frontend and admin always have the same product data
:param attribute_data: a dictionary of custom product attributes to update
"""
for store_code in (None, 'all'):
if not self._update_attributes(attribute_data, store_code):
return False # Avoid updating admin if store update fails
self.refresh() # Back to default scope
return True
def _update_attributes(self, attribute_data: dict, scope: Optional[str] = None) -> bool:
"""Sends a PUT request to update **top-level** product attributes
.. tip:: to update attributes or custom attributes with attribute scope taken into account,
use :meth:`~.update_attributes` or :meth:`~.update_custom_attributes` instead
:param attribute_data: dict containing any number of top-level attributes to update
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
url = self.data_endpoint(scope)
payload = {
"product": {
"sku": self.sku
},
'save_options': True
}
payload['product'].update(attribute_data)
response = self.client.put(url, payload)
if response.ok:
self.refresh(scope)
for key in attribute_data:
self.logger.info(
f"Updated {key} for {self} to {getattr(self, key)} on scope {self.get_scope_name(scope)}")
return True
else:
self.logger.error(
f'Failed with status code {response.status_code}' + '\n' +
f'Message: {response.json()}')
return False
[docs] def get_orders(self) -> Optional[Order | List[Order]]:
"""Searches for orders that contain the product
If the product is configurable, returns orders containing any of its child products
:returns: orders that contain the product, as an individual or list of :class:`~.Order` objects
"""
return self.client.orders.by_product(self)
[docs] def get_order_items(self) -> Optional[OrderItem | List[OrderItem]]:
"""Searches for order items that contain the product
If the product is configurable, returns order items containing any of its child products
:returns: order items that contain the product, as an individual or list of :class:`~.OrderItem` objects
"""
return self.client.order_items.by_product(self)
[docs] def get_invoices(self) -> Optional[Invoice | List[Invoice]]:
"""Searches for invoices that contain the product
If the product is configurable, returns invoices containing any of its child products
:returns: invoices that contain the product, as an individual or list of :class:`~.Invoice` objects
"""
return self.client.invoices.by_product(self)
[docs] def delete(self) -> bool:
"""Deletes the product
.. hint:: If you delete a product by accident, the :class:`Product` object's ``data``
attribute will still contain the raw data, which can be used to recover it.
Alternatively, don't delete it by accident.
"""
url = self.data_endpoint()
response = self.client.delete(url)
if response.ok and response.json() is True:
self.logger.info(f'Deleted {self}')
return True
else:
self.logger.error(
f'Failed to delete {self}. Message: {response.json()["message"]}'
)
return False
[docs] def get_children(self, refresh: bool = False, scope: Optional[str] = None) -> List[Product]:
"""Retrieve the child simple products of a configurable product
:param refresh: if True, calls :meth:`~.Model.refresh` on the child products to retrieve full data
:param scope: the scope to refresh the children on (when ``refresh=True``)
"""
if refresh:
for child in self.children:
child.refresh(scope)
return self.children
@cached_property
def children(self) -> List[Product]:
"""If the Product is a configurable product, returns a list of its child products"""
if self.type_id == 'configurable':
url = self.client.url_for(f'configurable-products/{self.encoded_sku}/children')
if (response := self.client.get(url)).ok:
return [self.parse(child) for child in response.json()]
else:
self.logger.error(f'Failed to get child products of {self}')
else:
self.logger.info('Only configurable products have child SKUs')
return []
@cached_property
def categories(self) -> Optional[Category | List[Category]]:
"""Categories the product is in, returned as a list of :class:`~.Category` objects"""
category_ids = self.custom_attributes.get('category_ids', [])
return self.client.categories.by_list('entity_id', category_ids)
@cached_property
def media_gallery_entries(self) -> List[MediaEntry]:
"""The product's media gallery entries, returned as a list of :class:`MediaEntry` objects"""
return [MediaEntry(self, entry) for entry in self.__media_gallery_entries]
@cached_property
def thumbnail(self) -> MediaEntry:
"""The :class:`MediaEntry` corresponding to the product's thumbnail"""
for entry in self.media_gallery_entries:
if entry.is_thumbnail:
return entry
@property
def thumbnail_link(self) -> str:
"""Link of the product's :attr:`~.thumbnail` image"""
if self.thumbnail:
return self.thumbnail.link
@property
def encoded_sku(self) -> str:
"""URL-encoded SKU, which is used in request endpoints"""
return self.encode(self.sku)
@cached_property
def option_skus(self) -> List[str]:
"""The full SKUs for the product's customizable options, if they exist
.. hint:: When a product with customizable options is ordered, these SKUs are used by the API when
retrieving and searching for :class:`~.Order` and :class:`~.OrderItem` data
"""
option_skus = []
if hasattr(self, 'options'):
for option in self.options:
base_sku = option['product_sku']
for val in option['values']:
if val.get('sku'):
option_skus.append(base_sku + '-' + val['sku'])
return option_skus
@property
def stock(self) -> int:
"""Current stock quantity"""
if self.stock_item:
return self.stock_item['qty']
@property
def stock_item(self) -> dict:
"""Stock data from the StockItem Interface"""
if hasattr(self, 'extension_attributes'): # Missing if product was retrieved by id
if stock_data := self.extension_attributes.get('stock_item', {}):
return stock_data
# Use the SKU to refresh attributes with full product data
self.refresh()
return self.stock_item
@property
def stock_item_id(self) -> int:
"""Item id of the StockItem, used to :meth:`~.update_stock`"""
if self.stock_item:
return self.stock_item['item_id']
@property
def description(self) -> str:
"""Product description (as HTML)"""
return self.custom_attributes.get('description', '')
@property
def special_price(self) -> float:
"""The current special (sale) price"""
return self.custom_attributes.get('special_price')
[docs]class MediaEntry(Model):
"""Wraps a media gallery entry of a :class:`Product`"""
MEDIA_TYPES = ['base', 'small', 'thumbnail', 'swatch']
DOCUMENTATION = "https://adobe-commerce.redoc.ly/2.3.7-admin/tag/productsskumediaentryId"
IDENTIFIER = "id"
[docs] def __init__(self, product: Product, entry: dict):
"""Initialize a MediaEntry object for a :class:`Product`
:param product: the :class:`Product` that the gallery entry is associated with
:param entry: the json response data to use as the source data
"""
super().__init__(
data=entry,
client=product.client,
endpoint=f'products/{product.encoded_sku}/media'
)
self.product = product
def __repr__(self):
return f"<MediaEntry {self.id} for {self.product}: {self.label}>"
[docs] def query_endpoint(self) -> None:
"""No search endpoint exists for media gallery entries"""
return self.logger.info("There is no search interface for media gallery entries")
@property
def excluded_keys(self) -> List[str]:
return []
@property
def is_enabled(self):
return not self.disabled
@property
def is_thumbnail(self):
return 'thumbnail' in self.types
@cached_property
def link(self):
"""Permalink to the image"""
return self.client.store.active.base_media_url + 'catalog/product' + self.file
[docs] def disable(self, scope: Optional[str] = None) -> bool:
"""Disables the MediaEntry on the given scope
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
self.data['disabled'] = True
return self.update(scope)
[docs] def enable(self, scope: Optional[str] = None) -> bool:
"""Enables the MediaEntry on the given scope
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
self.data['disabled'] = False
return self.update(scope)
[docs] def add_media_type(self, media_type: str, scope: Optional[str] = None) -> bool:
"""Add a media type to the MediaEntry on the given scope
.. caution:: If the media type is already assigned to a different entry, it will be removed
:param media_type: one of the :attr:`~.MEDIA_TYPES`
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if media_type in self.MEDIA_TYPES and media_type not in self.types:
self.data['types'].append(media_type)
return self.update(scope)
[docs] def remove_media_type(self, media_type: str, scope: Optional[str] = None) -> bool:
"""Remove a media type from the MediaEntry on the given scope
:param media_type: one of the :attr:`~MEDIA_TYPES`
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if media_type in self.types:
self.data['types'].remove(media_type)
return self.update(scope)
self.logger.error(f'{media_type} is not currently assigned to {self}')
return False
[docs] def set_media_types(self, types: list, scope: Optional[str] = None) -> bool:
"""Set media types for the MediaEntry on the given scope
:param types: a list containing all :attr:`~MEDIA_TYPES` to assign
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if not isinstance(types, list):
raise TypeError('types must be a list')
self.data['types'] = [t for t in types if t in self.MEDIA_TYPES]
return self.update(scope)
[docs] def set_position(self, position: int, scope: Optional[str] = None) -> bool:
"""Set the position of the MediaEntry on the given scope
:param position: the position to change to
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if not isinstance(position, int):
raise TypeError('position must be an int')
self.data['position'] = position
return self.update(scope)
[docs] def set_alt_text(self, text: str, scope: Optional[str] = None) -> bool:
"""Set the alt text (``label``) of the MediaEntry on the given scope
:param text: the alt text to use
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if not isinstance(text, str):
raise TypeError('text must be a string')
self.data['label'] = text
return self.update(scope)
[docs] def update(self, scope: Optional[str] = None) -> bool:
"""Uses the :attr:`~.data` dict to update the media entry
.. note:: Some updates alter the data of other entries; if the update is successful, the
associated :class:`Product` will be refreshed on the same scope to keep the data consistent
.. tip:: If there's only 1 store view, the admin will also be updated
:param scope: the scope to send the request on; will use the :attr:`.Client.scope` if not provided
"""
if self.client.store.is_single_store:
success = self._update_single_store()
else:
success = self._update(scope)
self.refresh(scope) # Get updated data if success or reset to accurate data if failed
if success:
self.product.refresh(scope)
return success
def _update_single_store(self):
"""Updates the MediaEntry data on the default store view and admin"""
for scope in (None, 'all'):
if not self._update(scope):
return False # Avoid updating admin if store update fails
return True
def _update(self, scope: Optional[str] = None) -> bool:
url = self.data_endpoint(scope)
response = self.client.put(url, payload={'entry': self.data})
if response.ok and response.json() is True:
self.logger.info(
f"Updated {self} on scope {self.get_scope_name(scope)}"
)
return True
else:
self.logger.error(
f"Failed to update {self} on scope {self.get_scope_name(scope)}"
)
return False
[docs]class ProductAttribute(Model):
"""Wrapper for the ``products/attributes`` endpoint"""
DOCUMENTATION = "https://adobe-commerce.redoc.ly/2.3.7-admin/tag/productsattributes/"
IDENTIFIER = "attribute_code"
[docs] def __init__(self, data: dict, client: Client):
"""Initialize a ProductAttribute object using an API response from the ``products/attributes`` endpoint
:param data: the API response from the ``products/attributes`` endpoint
:param client: an initialized :class:`~.Client` object
"""
super().__init__(
data=data,
client=client,
endpoint='products/attributes',
private_keys=True
)
def __repr__(self):
return f"<Product Attribute: {self.attribute_code}>"
@property
def excluded_keys(self) -> List[str]:
return ['options']
@property
def options(self):
return self.unpack_attributes(self.__options, key='label')