Source code for magento.clients

from __future__ import annotations
import json
import pickle
import requests
from functools import cached_property
from typing import Optional, Dict, List
from .utils import MagentoLogger, get_agent
from .models import APIResponse, ProductAttribute
from .search import SearchQuery, OrderSearch, ProductSearch, InvoiceSearch, CategorySearch, ProductAttributeSearch, OrderItemSearch
from .exceptions import AuthenticationError, MagentoError


[docs]class Client: """The class that handles all interaction with the API"""
[docs] def __init__( self, domain: str, username: str, password: str, scope: Optional[str] = '', user_agent: Optional[str] = None, token: Optional[str] = None, log_level: str = 'INFO', login: bool = True, **kwargs ): """Initialize a Client .. admonition:: Important! :class: important-af The Magento account you use to log in must be assigned a **User Role** that has the appropriate API resources included in its **Resource Access** settings This can be verified in Magento Admin by going to:: System -> Permissions -> User Roles -> {Role} -> Role Resources -> Resource Access and ensuring that ``Sales``, ``Catalog``, ``Customers``, and any other desired resources are included :param domain: domain name of the Magento store (ex. ``domain.com``) :param username: username of the Magento Admin account :param password: password of the Magento Admin account :param scope: the store view scope to :meth:`~search` and make requests on :param user_agent: the user agent to use in requests :param token: an existing access token :param log_level: the logging level for logging to stdout :param login: if ``True``, calls :meth:`~.authenticate` upon initialization :param kwargs: see below ... :Optional Keyword Arguments: * **log_file** (``str``) – log file to use for the client's :attr:`logger` * **log_requests** (``bool``) - if ``True``, the logs from :mod:`requests` will be added to the client's ``log_file`` """ #: The base API URL self.BASE_URL: str = f'https://www.{domain}/rest/V1/' #: The user credentials self.USER_CREDENTIALS: Dict[str, str] = { 'username': username, 'password': password } #: The API access token self.ACCESS_TOKEN: str = token #: The Magento store domain self.domain: str = domain #: The store view code to request/update data on self.scope: str = scope #: The user agent to use in requests self.user_agent: str = user_agent if user_agent else get_agent() #: The :class:`~.MagentoLogger` for the domain/username combination self.logger: MagentoLogger = self.get_logger( stdout_level=log_level, log_file=kwargs.get('log_file', None), log_requests=kwargs.get('log_requests', True) ) #: An initialized :class:`Store` object self.store: Store = Store(self) if login: self.authenticate()
[docs] @classmethod def new(cls) -> Client: """Prompts for input to log in to the Magento API""" return cls( input('Domain: '), input('Username: '), input('Password: ') )
[docs] @classmethod def load(cls, pickle_bytes: bytes) -> Client: """Initialize a :class:`~.Client` using a pickle bytestring from :meth:`~.to_pickle`""" return pickle.loads(pickle_bytes)
[docs] @classmethod def from_json(cls, json_str: str) -> Client: """Initialize a :class:`~.Client` from a JSON string of settings""" return cls.from_dict(json.loads(json_str))
[docs] @classmethod def from_dict(cls, d: dict) -> Client: """Initialize a :class:`~.Client` from a dictionary of settings""" return cls(**d)
[docs] def url_for(self, endpoint: str, scope: str = None) -> str: """Returns the appropriate request url for the given API endpoint and store scope .. admonition:: Example :class: example :: # Generate the url for credit memo with id 7 >> api=Client("domain.com", "user", "password") >> api.url_for('creditmemo/7') "https://domain.com/rest/V1/creditmemo/7" # Generate the same url on the "en" store view >> api.url_for('creditmemo/7', scope='en') "https://domain.com/rest/en/V1/creditmemo/7" :param endpoint: the API endpoint :param scope: the scope to generate the url for; uses the :attr:`.Client.scope` if not provided """ if not scope: if self.scope and scope is None: scope = self.scope else: return self.BASE_URL + endpoint return self.BASE_URL.replace('/V1', f'/{scope}/V1') + endpoint
[docs] def search(self, endpoint: str) -> SearchQuery: """Initializes and returns a :class:`~.SearchQuery` corresponding to the specified endpoint .. note:: Several endpoints have predefined :class:`~.SearchQuery` and :class:`~.Model` subclasses If a subclass hasn't been defined for the ``endpoint`` yet, a general :class:`~.SearchQuery` will be returned, which wraps the :attr:`~.SearchQuery.result` with :class:`~.APIResponse` :param endpoint: a valid Magento API search endpoint """ if endpoint.lower() == 'orders': return self.orders if endpoint.lower() == 'orders/items': return self.order_items if endpoint.lower() == 'invoices': return self.invoices if endpoint.lower() == 'categories': return self.categories if endpoint.lower() == 'products': return self.products if endpoint.lower() == 'products/attributes': return self.product_attributes # Any other endpoint is queried with a general SearchQuery object return SearchQuery(endpoint=endpoint, client=self)
@property def orders(self) -> OrderSearch: """Initializes an :class:`~.OrderSearch`""" return OrderSearch(self) @property def order_items(self) -> OrderItemSearch: """Initializes an :class:`~.OrderItemSearch`""" return OrderItemSearch(self) @property def invoices(self) -> InvoiceSearch: """Initializes an :class:`~.InvoiceSearch`""" return InvoiceSearch(self) @property def categories(self) -> CategorySearch: """Initializes a :class:`~.CategorySearch`""" return CategorySearch(self) @property def products(self) -> ProductSearch: """Initializes a :class:`~.ProductSearch`""" return ProductSearch(self) @property def product_attributes(self) -> ProductAttributeSearch: """Initializes a :class:`~.ProductAttributeSearch`""" return ProductAttributeSearch(self)
[docs] def get(self, url: str) -> requests.Response: """Sends an authorized ``GET`` request :param url: the URL to make the request on """ return self.request('GET', url)
[docs] def post(self, url: str, payload: dict) -> requests.Response: """Sends an authorized ``POST`` request :param url: the URL to make the request on :param payload: the JSON payload for the request """ return self.request('POST', url, payload)
[docs] def put(self, url: str, payload: dict) -> requests.Response: """Sends an authorized ``PUT`` request :param url: the URL to make the request on :param payload: the JSON payload for the request """ return self.request('PUT', url, payload)
[docs] def delete(self, url: str) -> requests.Response: """Sends an authorized ``DELETE`` request :param url: the URL to make the request on """ return self.request('DELETE', url)
[docs] def authenticate(self) -> bool: """Authenticates the :attr:`~.USER_CREDENTIALS` and retrieves an access token """ endpoint = self.url_for('integration/admin/token') payload = self.USER_CREDENTIALS headers = { 'Content-Type': 'application/json', 'User-Agent': self.user_agent } self.logger.info(f'Authenticating {payload["username"]} on {self.domain}...') response = requests.post( url=endpoint, json=payload, headers=headers ) if response.ok: self.ACCESS_TOKEN = response.json() else: raise AuthenticationError(self, response=response) self.logger.debug('Validating token...') try: self.validate() except AuthenticationError as e: raise AuthenticationError(self, msg='Token validation failed') from e self.logger.info('Logged in to {}'.format(payload["username"])) return True
[docs] def validate(self) -> bool: """Validates the :attr:`~.token` by sending an authorized request to a standard API endpoint """ response = self.get(self.url_for('store/websites')) if response.status_code == 200: self.logger.debug("Token validated for {} on {}".format( self.USER_CREDENTIALS['username'], self.domain) ) return True else: msg = "Token validation failed for {} on {}".format( self.USER_CREDENTIALS['username'], self.domain ) raise AuthenticationError(self, msg=msg, response=response)
[docs] def request(self, method: str, url: str, payload: dict = None) -> requests.Response: """Sends an authorized API request. Used for all internal requests .. tip:: Use :meth:`get`, :meth:`post`, :meth:`put` or :meth:`delete` instead :param method: the request method :param url: the url to send the request to :param payload: the JSON payload for the request (if the method is ``POST`` or ``PUT``) """ if method in ('GET', 'DELETE'): response = requests.request(method, url, headers=self.headers) elif method in ('POST', 'PUT'): if payload: response = requests.request(method, url, json=payload, headers=self.headers) else: raise ValueError('Must provide a non-empty payload') else: raise ValueError('Invalid request method provided') if response.status_code == 401: self.logger.debug("Attempting to re-authenticate...") self.authenticate() # Will raise AuthenticationError if unsuccessful (won't recurse infinitely) return self.request(method, url, payload) if response.status_code != 200: # All other responses are returned; errors are handled by methods self.logger.error("Request to {} failed with status code {}.\n{message}".format( url, response.status_code, message=MagentoError.parse(response)) ) return response
[docs] def get_logger(self, log_file: str = None, stdout_level: str = 'INFO', log_requests: bool = True) -> MagentoLogger: """Retrieve a MagentoLogger for the current username/domain combination. Log files are DEBUG. :param log_file: the file to log to :param stdout_level: the logging level for stdout logging :param log_requests: if ``True``, adds the :class:`~.FileHandler` to the :mod:`~.urllib3.connectionpool` logger """ logger_name = MagentoLogger.CLIENT_LOG_NAME.format( domain=self.domain.split('.')[0], username=self.USER_CREDENTIALS['username'] ) # Example:``domain_username`` return MagentoLogger( name=logger_name, log_file=log_file, stdout_level=stdout_level, log_requests=log_requests )
@property def headers(self) -> dict: """Authorization headers for API requests Automatically generates a :attr:`token` if needed """ return { 'Authorization': f'Bearer {self.token}', 'User-Agent': self.user_agent } @property def token(self) -> str: """Returns or generates an :attr:`~ACCES_TOKEN`""" if not self.ACCESS_TOKEN: self.authenticate() return self.ACCESS_TOKEN
[docs] def to_pickle(self, validate: bool = False) -> bytes: """Serializes the Client to a pickle bytestring :param validate: if ``True``, validates the :attr:`token`/:attr:`USER_CREDENTIALS` before serializing """ if validate: self.validate() return pickle.dumps(self)
[docs] def to_json(self, validate: bool = False) -> str: """Serializes the Client to a JSON string :param validate: if ``True``, validates the :attr:`token`/:attr:`USER_CREDENTIALS` before serializing """ data = self.to_dict(validate) return json.dumps(data)
[docs] def to_dict(self, validate: bool = False) -> Dict[str, str]: """Serializes the Client to a dictionary :param validate: if ``True``, validates the :attr:`token`/:attr:`USER_CREDENTIALS` before serializing """ if validate: self.validate() data = { 'domain': self.domain, 'username': self.USER_CREDENTIALS['username'], 'password': self.USER_CREDENTIALS['password'], 'scope': self.scope, 'user_agent': self.user_agent, 'token': self.token, 'log_level': self.logger.logger.level, 'log_file': self.logger.log_file } return data
[docs] def view_config(self): """Prints the Client configuration settings""" for k, v in self.to_dict().items(): print(f'{k} : {v}')
[docs]class Store: """Class containing store configurations and cached attribute lists"""
[docs] def __init__(self, client: Client): """Initialize a Store object :param client: an initialized :class:`~.Client` object """ self.client = client
@property def is_single_store(self) -> bool: """Whether the store has a single store view (``default``) or multiple store views""" return len(self.configs) == 1 @property def active(self) -> APIResponse: """Returns the store config corresponding to the current :attr:`~.Client.scope` of the :class:`Client`""" store_code = 'default' if self.client.scope in ('', 'all') else self.client.scope for store in self.configs: if store.code == store_code: return store @cached_property def configs(self) -> Optional[APIResponse | List[APIResponse]]: """Returns a list of all store configurations""" return self.client.search('store/storeConfigs').execute() @cached_property def views(self) -> Optional[APIResponse | List[APIResponse]]: """Returns a list of all store views""" return self.client.search('store/storeViews').execute() @cached_property def all_product_attributes(self) -> List[ProductAttribute]: """A cached list of all product attributes""" return self.client.product_attributes.get_all() @cached_property def store_view_product_attributes(self) -> List[ProductAttribute]: """A cached list of all product attributes with the ``Store View`` scope""" return [attr for attr in self.all_product_attributes if attr.scope == 'store'] @cached_property def website_product_attributes(self) -> List[ProductAttribute]: """A cached list of all product attributes with the ``Web Site`` scope""" return [attr for attr in self.all_product_attributes if attr.scope == 'website'] @cached_property def global_product_attributes(self) -> List[ProductAttribute]: """A cached list of all product attributes with the ``Global`` scope""" return [attr for attr in self.all_product_attributes if attr.scope == 'global'] @cached_property def website_attribute_codes(self) -> List[str]: """The attribute codes of the :attr:`~.website_product_attributes`""" return [attr.attribute_code for attr in self.website_product_attributes]
[docs] def filter_website_attrs(self, attribute_data: dict) -> dict: """Filters a product attribute dict and returns a new one that contains only the website scope attributes Website scoped attributes must be updated on the admin by making a second request on the ``all`` scope * This method is called by :meth:`~.Product.update_attributes` and :meth:`~.Product.update_custom_attributes` to see if the second request is needed .. admonition:: **Example** :class: example The ``price`` attribute is ``Website`` scope and the ``meta_title`` attribute is ``Store View`` scope :: >> attribute_data = {'price': 12, 'meta_title': 'My Product'} >> store.filter_website_attrs(attribute_data) {'price': 12} :param attribute_data: a dict of product attributes """ return {k: v for k, v in attribute_data.items() if k in self.website_attribute_codes}
[docs] def refresh(self) -> bool: """Clears all cached properties""" cached = ('configs', 'views', 'all_product_attributes', 'store_view_product_attributes', 'website_product_attributes', 'global_product_attributes', 'website_attribute_codes') for key in cached: self.__dict__.pop(key, None) return True