diff --git a/.meta.toml b/.meta.toml index d33e21f1..9c960e73 100644 --- a/.meta.toml +++ b/.meta.toml @@ -9,6 +9,15 @@ commit-id = "2.2.2" codespell_extra_lines = """ exclude: docs/locale/.*.pot """ +extra_lines = """ +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.18.2 + hooks: + - id: mypy + additional_dependencies: + - types-decorator + exclude: docs +""" [pyproject] check_manifest_ignores = """ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e894b502..d0b95ecf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -76,6 +76,13 @@ repos: hooks: - id: i18ndude +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.18.2 + hooks: + - id: mypy + additional_dependencies: + - types-decorator + exclude: docs ## # Add extra configuration options in .meta.toml: diff --git a/news/+a04874c0.internal.md b/news/+a04874c0.internal.md new file mode 100644 index 00000000..08857852 --- /dev/null +++ b/news/+a04874c0.internal.md @@ -0,0 +1 @@ +Add typing annotations [@ale-rt] diff --git a/news/+ec620103.internal.md b/news/+ec620103.internal.md new file mode 100644 index 00000000..b9f9179f --- /dev/null +++ b/news/+ec620103.internal.md @@ -0,0 +1 @@ +More typing annotations for the typing module diff --git a/setup.py b/setup.py index ec5ae28a..8de88c2c 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ "plone.dexterity", "plone.i18n", "plone.registry", + "plone.supermodel", "plone.uuid", "zope.globalrequest", "Products.CMFCore", diff --git a/src/plone/api/addon.py b/src/plone/api/addon.py index 1b8163aa..9993bec5 100644 --- a/src/plone/api/addon.py +++ b/src/plone/api/addon.py @@ -11,6 +11,8 @@ from Products.CMFPlone.interfaces import INonInstallable from Products.CMFPlone.utils import get_installer from Products.GenericSetup import EXTENSION +from typing import Any +from typing import cast from typing import Dict from typing import List from typing import Tuple @@ -98,7 +100,7 @@ def _get_non_installable_addons() -> NonInstallableAddons: @lru_cache(maxsize=1) -def _cached_addons() -> Tuple[Tuple[str, AddonInformation]]: +def _cached_addons() -> Tuple[Tuple[str, AddonInformation], ...]: """Return information about add-ons in this installation. :returns: Tuple of tuples with add-on id and AddonInformation. @@ -134,7 +136,7 @@ def _cached_addons() -> Tuple[Tuple[str, AddonInformation]]: profile_type = pid_parts[-1] if product_id not in addons: # get some basic information on the product - product = { + product: Dict[str, Any] = { "id": product_id, "version": get_version(product_id), "title": product_id, @@ -278,7 +280,9 @@ def get(addon: str) -> AddonInformation: addons = dict(_cached_addons()) if addon not in addons: raise InvalidParameterError(f"No add-on {addon} found.") - return _update_addon_info(addons.get(addon), _get_installer()) + return _update_addon_info( + cast(AddonInformation, addons.get(addon)), _get_installer() + ) @required_parameters("addon") diff --git a/src/plone/api/content.py b/src/plone/api/content.py index a0a660f0..a677b4e2 100644 --- a/src/plone/api/content.py +++ b/src/plone/api/content.py @@ -2,6 +2,7 @@ from Acquisition import aq_chain from Acquisition import aq_inner +from Acquisition import ImplicitAcquisitionWrapper from copy import copy as _copy from itertools import islice from plone.api import portal @@ -14,6 +15,15 @@ from plone.uuid.interfaces import IUUID from Products.CMFCore.DynamicType import DynamicType from Products.CMFCore.WorkflowCore import WorkflowException +from typing import Any +from typing import Callable +from typing import cast +from typing import Dict +from typing import Iterator +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union from zope.component import ComponentLookupError from zope.component import getMultiAdapter from zope.component import getSiteManager @@ -21,12 +31,17 @@ from zope.globalrequest import getRequest from zope.interface import Interface from zope.interface import providedBy +from zope.interface.interface import InterfaceClass +from ZPublisher.BaseRequest import RequestContainer +from ZPublisher.HTTPRequest import HTTPRequest +from ZTUtils.Lazy import LazyCat +from ZTUtils.Lazy import LazyMap import transaction import uuid -_marker = [] +_marker = object() # Maximum number of attempts to generate a unique random ID MAX_UNIQUE_ID_ATTEMPTS = 100 @@ -35,13 +50,13 @@ @required_parameters("container", "type") @at_least_one_of("id", "title") def create( - container=None, - type=None, - id=None, - title=None, - safe_id=False, + container: Optional[ImplicitAcquisitionWrapper] = None, + type: Optional[str] = None, + id: Optional[str] = None, + title: Optional[str] = None, + safe_id: bool = False, **kwargs, # NOQA: C816, S101 -): +) -> ImplicitAcquisitionWrapper: """Create a new content item. :param container: [required] Container object in which to create the new @@ -77,7 +92,7 @@ def create( attempts = 0 while attempts < MAX_UNIQUE_ID_ATTEMPTS: content_id = str(uuid.uuid4()) - if content_id not in container: + if content_id not in cast(ImplicitAcquisitionWrapper, container): break attempts += 1 # If we couldn't find a unique ID after max attempts, raise ValueError @@ -88,13 +103,18 @@ def create( kwargs["title"] = title try: - container.invokeFactory(type, content_id, **kwargs) + cast(ImplicitAcquisitionWrapper, container).invokeFactory( + type, cast(str, content_id), **kwargs + ) except UnicodeDecodeError: # UnicodeDecodeError is a subclass of ValueError, # so will be swallowed below unless we re-raise it here raise except ValueError as e: - types = [fti.getId() for fti in container.allowedContentTypes()] + types = [ + fti.getId() + for fti in cast(ImplicitAcquisitionWrapper, container).allowedContentTypes() + ] raise InvalidParameterError( "Cannot add a '{obj_type}' object with id={obj_id} to the container {container_path}.\n" @@ -102,17 +122,21 @@ def create( "{allowed_types}\n" "{message}".format( obj_type=type, - obj_id=content_id, - container_path="/".join(container.getPhysicalPath()), + obj_id=cast(str, content_id), + container_path="/".join( + cast(ImplicitAcquisitionWrapper, container).getPhysicalPath() + ), allowed_types="\n".join(sorted(types)), message=str(e), ), ) - content = container[content_id] + content: ImplicitAcquisitionWrapper = cast(ImplicitAcquisitionWrapper, container)[ + cast(str, content_id) + ] if not id or (safe_id and id): # Create a new id from title - chooser = INameChooser(container) + chooser = INameChooser(cast(ImplicitAcquisitionWrapper, container)) derived_id = id or title new_id = chooser.chooseName(derived_id, content) # kacee: we must do a partial commit, else the renaming fails because @@ -129,7 +153,9 @@ def create( @mutually_exclusive_parameters("path", "UID") @at_least_one_of("path", "UID") -def get(path=None, UID=None): +def get( + path: Optional[str] = None, UID: Optional[str] = None +) -> Optional[ImplicitAcquisitionWrapper]: """Get an object. :param path: Path to the object we want to get, relative to @@ -151,10 +177,10 @@ def get(path=None, UID=None): relative_path=path, ) try: - path = path.split("/") - if len(path) > 1: - parent = site.unrestrictedTraverse(path[:-1]) - content = parent.restrictedTraverse(path[-1]) + path_list = cast(str, path).split("/") + if len(path_list) > 1: + parent = site.unrestrictedTraverse(path_list[:-1]) + content = parent.restrictedTraverse(path_list[-1]) else: content = site.restrictedTraverse(path[-1]) except (KeyError, AttributeError): @@ -166,11 +192,17 @@ def get(path=None, UID=None): elif UID: return uuidToObject(UID) + return None @required_parameters("source") @at_least_one_of("target", "id") -def move(source=None, target=None, id=None, safe_id=False): +def move( + source: Optional[ImplicitAcquisitionWrapper] = None, + target: Optional[ImplicitAcquisitionWrapper] = None, + id: Optional[str] = None, + safe_id: bool = False, +) -> ImplicitAcquisitionWrapper: """Move the object to the target container. :param source: [required] Object that we want to move. @@ -195,6 +227,7 @@ def move(source=None, target=None, id=None, safe_id=False): ValueError :Example: :ref:`content-move-example` """ + source = cast(ImplicitAcquisitionWrapper, source) source_id = source.getId() # If no target is given the object is probably renamed @@ -212,7 +245,11 @@ def move(source=None, target=None, id=None, safe_id=False): @required_parameters("obj", "new_id") -def rename(obj=None, new_id=None, safe_id=False): +def rename( + obj: Optional[ImplicitAcquisitionWrapper] = None, + new_id: Optional[str] = None, + safe_id: bool = False, +) -> ImplicitAcquisitionWrapper: """Rename the object. :param obj: [required] Object that we want to rename. @@ -226,6 +263,7 @@ def rename(obj=None, new_id=None, safe_id=False): :returns: Content object that was renamed :Example: :ref:`content-rename-example` """ + obj = cast(ImplicitAcquisitionWrapper, obj) obj_id = obj.getId() container = obj.aq_parent @@ -240,7 +278,12 @@ def rename(obj=None, new_id=None, safe_id=False): @required_parameters("source") @at_least_one_of("target", "id") -def copy(source=None, target=None, id=None, safe_id=False): +def copy( + source: Optional[ImplicitAcquisitionWrapper] = None, + target: Optional[ImplicitAcquisitionWrapper] = None, + id: Optional[str] = None, + safe_id: bool = False, +) -> ImplicitAcquisitionWrapper: """Copy the object to the target container. :param source: [required] Object that we want to copy. @@ -264,6 +307,7 @@ def copy(source=None, target=None, id=None, safe_id=False): ValueError :Example: :ref:`content-copy-example` """ + source = cast(ImplicitAcquisitionWrapper, source) source_id = source.getId() if target is None: @@ -286,7 +330,11 @@ def copy(source=None, target=None, id=None, safe_id=False): @mutually_exclusive_parameters("obj", "objects") @at_least_one_of("obj", "objects") -def delete(obj=None, objects=None, check_linkintegrity=True): +def delete( + obj: Optional[ImplicitAcquisitionWrapper] = None, + objects: Optional[List[ImplicitAcquisitionWrapper]] = None, + check_linkintegrity: bool = True, +): """Delete the object(s). :param obj: Object that we want to delete. @@ -328,7 +376,9 @@ def delete(obj=None, objects=None, check_linkintegrity=True): @required_parameters("obj") -def get_state(obj=None, default=_marker): +def get_state( + obj: Optional[ImplicitAcquisitionWrapper] = None, default: Any = _marker +) -> str: """Get the current workflow state of the object. :param obj: [required] Object that we want to get the state for. @@ -351,12 +401,17 @@ def get_state(obj=None, default=_marker): # work backwards from our end state -def _find_path(maps, path, current_state, start_state): +def _find_path( + maps: Dict[str, List[Tuple[str, List[str]]]], + path: List[Union[str, Any]], + current_state: str, + start_state: str, +) -> Optional[List[str]]: paths = [] # current_state could not be on maps if it only has outgoing # transitions. i.e an initial state you are not able to return to. if current_state not in maps: - return + return None for new_transition, from_states in maps[current_state]: next_path = _copy(path) @@ -382,7 +437,9 @@ def _find_path(maps, path, current_state, start_state): return len(paths) and min(paths, key=len) or None -def _wf_transitions_for(workflow, from_state, to_state): +def _wf_transitions_for( + workflow: ImplicitAcquisitionWrapper, from_state: str, to_state: str +) -> Optional[List[str]]: """Get list of transition IDs required to transition. from ``from_state`` to ``to_state``. @@ -396,7 +453,7 @@ def _wf_transitions_for(workflow, from_state, to_state): :returns: A list of transitions :rtype: list """ - exit_state_maps = {} + exit_state_maps: Dict[str, List[str]] = {} for state in workflow.states.objectValues(): for transition in state.getTransitions(): exit_state_maps.setdefault(transition, []) @@ -420,7 +477,12 @@ def _wf_transitions_for(workflow, from_state, to_state): return _find_path(transition_maps, [], to_state, from_state) -def _transition_to(obj, workflow, to_state, **kwargs): +def _transition_to( + obj: ImplicitAcquisitionWrapper, + workflow: ImplicitAcquisitionWrapper, + to_state: str, + **kwargs, +): # move from the current state to the given state # via any route we can find for wf in workflow.getWorkflowsFor(obj): @@ -454,7 +516,12 @@ def _transition_to(obj, workflow, to_state, **kwargs): @required_parameters("obj") @at_least_one_of("transition", "to_state") @mutually_exclusive_parameters("transition", "to_state") -def transition(obj=None, transition=None, to_state=None, **kwargs): +def transition( + obj: Optional[ImplicitAcquisitionWrapper] = None, + transition: Optional[str] = None, + to_state: Optional[str] = None, + **kwargs, +): """Perform a workflow transition. for the object or attempt to perform @@ -475,6 +542,7 @@ def transition(obj=None, transition=None, to_state=None, **kwargs): :class:`~plone.api.exc.InvalidParameterError` :Example: :ref:`content-transition-example` """ + obj = cast(ImplicitAcquisitionWrapper, obj) workflow = portal.get_tool("portal_workflow") if transition is not None: try: @@ -488,6 +556,7 @@ def transition(obj=None, transition=None, to_state=None, **kwargs): "{}".format(transition, "\n".join(sorted(transitions))), ) else: + to_state = cast(str, to_state) _transition_to(obj, workflow, to_state, **kwargs) if workflow.getInfoFor(obj, "review_state") != to_state: raise InvalidParameterError( @@ -499,7 +568,7 @@ def transition(obj=None, transition=None, to_state=None, **kwargs): @required_parameters("obj") -def disable_roles_acquisition(obj=None): +def disable_roles_acquisition(obj: Optional[ImplicitAcquisitionWrapper] = None): """Disable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 1 on obj. @@ -513,7 +582,7 @@ def disable_roles_acquisition(obj=None): @required_parameters("obj") -def enable_roles_acquisition(obj=None): +def enable_roles_acquisition(obj: Optional[ImplicitAcquisitionWrapper] = None): """Enable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 0 on obj. @@ -527,7 +596,11 @@ def enable_roles_acquisition(obj=None): @required_parameters("name", "context") -def get_view(name=None, context=None, request=None): +def get_view( + name: Optional[str] = None, + context: Optional[ImplicitAcquisitionWrapper] = None, + request: Optional[HTTPRequest] = None, +): """Get a BrowserView object. :param name: [required] Name of the view. @@ -573,7 +646,7 @@ def get_view(name=None, context=None, request=None): @required_parameters("obj") -def get_uuid(obj=None): +def get_uuid(obj: Optional[ImplicitAcquisitionWrapper] = None) -> str: """Get the object's Universally Unique IDentifier (UUID). :param obj: [required] Object we want its UUID. @@ -588,7 +661,10 @@ def get_uuid(obj=None): @required_parameters("obj") -def get_path(obj=None, relative=False): +def get_path( + obj: Optional[Union[ImplicitAcquisitionWrapper, object]] = None, + relative: bool = False, +) -> str: """Get the path of an object. :param obj: [required] Object for which to get its path @@ -617,7 +693,7 @@ def get_path(obj=None, relative=False): return "/".join(rel_path) if rel_path else "" -def _parse_object_provides_query(query): +def _parse_object_provides_query(query: Any) -> Dict[str, Union[str, List[str]]]: """Create a query for the object_provides index. :param query: [required] @@ -644,7 +720,7 @@ def _parse_object_provides_query(query): query_not = [query_not] query_not = [getattr(x, "__identifier__", x) for x in query_not] - result = {} + result: Dict[str, Union[str, List[str]]] = {} if ifaces: result["query"] = ifaces @@ -656,7 +732,12 @@ def _parse_object_provides_query(query): return result -def find(context=None, depth=None, unrestricted=False, **kwargs): +def find( + context: Optional[ImplicitAcquisitionWrapper] = None, + depth: Optional[int] = None, + unrestricted: bool = False, + **kwargs, +) -> Union[LazyMap, LazyCat]: """Find content in the portal. :param context: Context for the search @@ -669,7 +750,7 @@ def find(context=None, depth=None, unrestricted=False, **kwargs): :Example: :ref:`content-find-example` """ - query = {} + query: Dict[str, Any] = {} query.update(**kwargs) # Save the original path to maybe restore it later. @@ -717,7 +798,12 @@ def find(context=None, depth=None, unrestricted=False, **kwargs): @required_parameters("obj") -def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): +def iter_ancestors( + obj: Optional[Union[ImplicitAcquisitionWrapper, object]] = None, + function: Optional[Callable] = None, + interface: Optional[InterfaceClass] = None, + stop_at: Union[bool, ImplicitAcquisitionWrapper] = _marker, +) -> Iterator[Union[ImplicitAcquisitionWrapper, RequestContainer]]: """Iterate over the object ancestors. Optionally filter the ancestors: @@ -754,7 +840,7 @@ def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): # # This is useful if we want to have an empty iterator when checking # for ancestors in the portal. - return iter(()) + yield from () chain = aq_chain(aq_inner(obj)) @@ -768,6 +854,7 @@ def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): else: end = None + ancestors: Union[Iterator[Union[ImplicitAcquisitionWrapper, RequestContainer]], Any] ancestors = islice(chain, 1, end) if interface is not None: @@ -780,7 +867,12 @@ def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): @required_parameters("obj") -def get_closest_ancestor(obj=None, function=None, interface=None, stop_at=_marker): +def get_closest_ancestor( + obj: Optional[Union[ImplicitAcquisitionWrapper, object]] = None, + function: Optional[Callable] = None, + interface: Optional[InterfaceClass] = None, + stop_at: Union[bool, ImplicitAcquisitionWrapper] = _marker, +) -> Optional[ImplicitAcquisitionWrapper]: """Get the closest ancestor that matches the criteria. See :func:`~plone.api.content.iter_ancestors` for more information on the parameters. diff --git a/src/plone/api/portal.py b/src/plone/api/portal.py index a3d21b5c..d5e0d72c 100644 --- a/src/plone/api/portal.py +++ b/src/plone/api/portal.py @@ -1,6 +1,11 @@ """Module that provides various utility methods on the portal level.""" from Acquisition import aq_inner +from Acquisition import ImplicitAcquisitionWrapper +from datetime import date +from datetime import datetime +from DateTime.DateTime import DateTime +from email.mime.multipart import MIMEMultipart from email.utils import formataddr from email.utils import parseaddr from logging import getLogger @@ -12,14 +17,21 @@ from Products.CMFCore.interfaces import ISiteRoot from Products.CMFCore.utils import getToolByName from Products.statusmessages.interfaces import IStatusMessage +from typing import Any +from typing import List +from typing import Optional +from typing import Union from zope.component import ComponentLookupError from zope.component import getUtilitiesFor from zope.component import getUtility from zope.component import providedBy from zope.component.hooks import getSite from zope.globalrequest import getRequest +from zope.interface.interface import InterfaceClass from zope.interface.interfaces import IInterface from zope.schema.interfaces import IVocabularyFactory +from zope.schema.vocabulary import SimpleVocabulary +from ZPublisher.HTTPRequest import HTTPRequest import datetime as dtime import re @@ -52,7 +64,7 @@ MISSING = object() -def get(): +def get() -> ImplicitAcquisitionWrapper: """Get the Plone portal object out of thin air. Without the need to import fancy Interfaces and doing multi adapter @@ -76,7 +88,9 @@ def get(): @required_parameters("context") -def get_navigation_root(context=None): +def get_navigation_root( + context: Optional[ImplicitAcquisitionWrapper] = None, +) -> ImplicitAcquisitionWrapper: """Get the navigation root object for the context. This traverses the path up and returns the nearest navigation root. @@ -93,7 +107,7 @@ def get_navigation_root(context=None): @required_parameters("name") -def get_tool(name=None): +def get_tool(name: Optional[str] = None) -> ImplicitAcquisitionWrapper: """Get a portal tool in a simple way. :param name: [required] Name of the tool you want. @@ -123,11 +137,11 @@ def get_tool(name=None): @required_parameters("recipient", "subject", "body") def send_email( - sender=None, - recipient=None, - subject=None, - body=None, - immediate=False, + sender: Optional[str] = None, + recipient: Optional[str] = None, + subject: Optional[str] = None, + body: Optional[Union[MIMEMultipart, str]] = None, + immediate: bool = False, ): """Send an email. @@ -171,11 +185,6 @@ def send_email( # formataddr probably got confused by special characters. sender = from_address - # If the mail headers are not properly encoded we need to extract - # them and let MailHost manage the encoding. - if isinstance(body, str): - body = body.encode(encoding) - host = get_tool("MailHost") host.send( body, @@ -188,7 +197,11 @@ def send_email( @required_parameters("datetime") -def get_localized_time(datetime=None, long_format=False, time_only=False): +def get_localized_time( + datetime: Optional[Union[date, DateTime, datetime]] = None, + long_format: bool = False, + time_only: bool = False, +) -> str: """Display a date/time in a user-friendly way. It should be localized to the user's preferred language. @@ -236,7 +249,11 @@ def get_localized_time(datetime=None, long_format=False, time_only=False): @required_parameters("message") -def show_message(message=None, request=None, type="info"): +def show_message( + message: Optional[str] = None, + request: Optional[HTTPRequest] = None, + type: str = "info", +): """Display a status message. :param message: [required] Message to show. @@ -255,7 +272,11 @@ def show_message(message=None, request=None, type="info"): @required_parameters("name") -def get_registry_record(name=None, interface=None, default=MISSING): +def get_registry_record( + name: Optional[str] = None, + interface: Optional[InterfaceClass] = None, + default: Any = MISSING, +) -> Any: """Get a record value from ``plone.app.registry``. :param name: [required] Name @@ -322,7 +343,11 @@ def get_registry_record(name=None, interface=None, default=MISSING): @required_parameters("name", "value") -def set_registry_record(name=None, value=None, interface=None): +def set_registry_record( + name: Optional[str] = None, + value: Any = None, + interface: Optional[InterfaceClass] = None, +): """Set a record value in the ``plone.app.registry``. :param name: [required] Name of the record @@ -374,7 +399,7 @@ def set_registry_record(name=None, value=None, interface=None): registry[name] = value -def get_default_language(): +def get_default_language() -> str: """Return the default language. :returns: language identifier @@ -388,7 +413,7 @@ def get_default_language(): return settings.default_language -def get_current_language(context=None): +def get_current_language(context: Optional[ImplicitAcquisitionWrapper] = None) -> str: """Return the current negotiated language. :param context: context object @@ -405,7 +430,7 @@ def get_current_language(context=None): ) -def translate(msgid, domain="plone", lang=None): +def translate(msgid: str, domain: str = "plone", lang: Optional[str] = None) -> str: """Translate a message into a given language. Default to current negotiated language if no target language specified. @@ -437,7 +462,9 @@ def translate(msgid, domain="plone", lang=None): @required_parameters("name") -def get_vocabulary(name=None, context=None): +def get_vocabulary( + name: Optional[str] = None, context: Optional[ImplicitAcquisitionWrapper] = None +) -> SimpleVocabulary: """Return a vocabulary object with the given name. :param name: Name of the vocabulary. @@ -464,7 +491,7 @@ def get_vocabulary(name=None, context=None): return vocabulary(context) -def get_vocabulary_names(): +def get_vocabulary_names() -> List[str]: """Return a list of vocabulary names. :returns: A sorted list of vocabulary names. diff --git a/src/plone/api/relation.py b/src/plone/api/relation.py index 10a0fa10..341e085f 100644 --- a/src/plone/api/relation.py +++ b/src/plone/api/relation.py @@ -4,6 +4,7 @@ """ from AccessControl.SecurityManagement import getSecurityManager +from Acquisition import ImplicitAcquisitionWrapper from collections import defaultdict from importlib.metadata import distribution from importlib.metadata import PackageNotFoundError @@ -14,6 +15,13 @@ from plone.app.linkintegrity.utils import referencedRelationship from plone.base.utils import base_hasattr from plone.dexterity.utils import iterSchemataForType +from plone.supermodel.model import SchemaClass +from typing import cast +from typing import DefaultDict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union from z3c.relationfield import event from z3c.relationfield import RelationValue from z3c.relationfield.schema import Relation @@ -23,8 +31,10 @@ from zope.component import getUtility from zope.intid.interfaces import IIntIds from zope.lifecycleevent import modified +from zope.schema._bootstrapfields import Field import logging +import z3c.relationfield.relation try: @@ -33,14 +43,19 @@ ITERATE_RELATION_NAME = None StagingRelationValue = None else: - from plone.app.iterate.dexterity import ITERATE_RELATION_NAME - from plone.app.iterate.dexterity.relation import StagingRelationValue + # isort: off + from plone.app.iterate.dexterity import ITERATE_RELATION_NAME # type: ignore[no-redef] + from plone.app.iterate.dexterity.relation import StagingRelationValue # type: ignore[no-redef] + + # isort: on logger = logging.getLogger(__name__) -def _get_field_and_schema_for_fieldname(field_id, portal_type): +def _get_field_and_schema_for_fieldname( + field_id: str, portal_type: str +) -> Optional[Tuple[Field, SchemaClass]]: """Get field and its schema from a portal_type.""" # Turn form.widgets.IDublinCore.title into title field_id = field_id.split(".")[-1] @@ -48,16 +63,20 @@ def _get_field_and_schema_for_fieldname(field_id, portal_type): field = schema.get(field_id, None) if field is not None: return (field, schema) + return None @at_least_one_of("source", "target", "relationship") def get( - source=None, - target=None, - relationship=None, - unrestricted=False, - as_dict=False, -): + source: Optional[ImplicitAcquisitionWrapper] = None, + target: Optional[ImplicitAcquisitionWrapper] = None, + relationship: Optional[str] = None, + unrestricted: bool = False, + as_dict: bool = False, +) -> Union[ + List[z3c.relationfield.relation.RelationValue], + DefaultDict[str, List[z3c.relationfield.relation.RelationValue]], +]: """Get specific relations given a source/target/relationship. :param source: Object that the relations originate from. @@ -71,7 +90,8 @@ def get( :param as_dict: If true, return a dictionary with the relationship name as keys. :type id: bool - :returns: A list of relations + :returns: A list of relations or a dict of lists of relations + if as_dict is True. :rtype: List of RelationValue objects :Example: :ref:`relation-get-example` @@ -91,10 +111,13 @@ def get( intids = getUtility(IIntIds) relation_catalog = getUtility(ICatalog) query = {} - results = [] if as_dict: - results = defaultdict(list) + results: DefaultDict[str, List[z3c.relationfield.relation.RelationValue]] = ( + defaultdict(list) + ) + else: + results: List[z3c.relationfield.relation.RelationValue] = [] # type: ignore [no-redef] if not relation_catalog: return results @@ -124,19 +147,23 @@ def get( if as_dict: results[relation.from_attribute].append(relation) else: - results.append(relation) + results.append(relation) # type: ignore [attr-defined] else: continue else: if as_dict: results[relation.from_attribute].append(relation) else: - results.append(relation) + results.append(relation) # type: ignore [attr-defined] return results @required_parameters("source", "target", "relationship") -def create(source=None, target=None, relationship=None): +def create( + source: Optional[ImplicitAcquisitionWrapper] = None, + target: Optional[ImplicitAcquisitionWrapper] = None, + relationship: Optional[str] = None, +): """Create a relation from source to target using zc.relation. :param source: [required] Object that the relation will originate from. @@ -193,7 +220,7 @@ def create(source=None, target=None, relationship=None): # This can only get a field from a dexterity item. field_and_schema = _get_field_and_schema_for_fieldname( from_attribute, - source.portal_type, + cast(ImplicitAcquisitionWrapper, source).portal_type, ) if field_and_schema is None: @@ -201,8 +228,8 @@ def create(source=None, target=None, relationship=None): # Only create a relation. logger.debug( "No dexterity field. Setting relation %s from %s to %s", - source.absolute_url(), - target.absolute_url(), + cast(ImplicitAcquisitionWrapper, source).absolute_url(), + cast(ImplicitAcquisitionWrapper, target).absolute_url(), relationship, ) event._setRelation(source, from_attribute, RelationValue(to_id)) @@ -214,8 +241,8 @@ def create(source=None, target=None, relationship=None): logger.info( "Add relation to relationlist %s from %s to %s", from_attribute, - source.absolute_url(), - target.absolute_url(), + cast(ImplicitAcquisitionWrapper, source).absolute_url(), + cast(ImplicitAcquisitionWrapper, target).absolute_url(), ) if not has_relation: existing_relations = getattr(source, from_attribute, None) or [] @@ -228,8 +255,8 @@ def create(source=None, target=None, relationship=None): logger.info( "Add relation %s from %s to %s", from_attribute, - source.absolute_url(), - target.absolute_url(), + cast(ImplicitAcquisitionWrapper, source).absolute_url(), + cast(ImplicitAcquisitionWrapper, target).absolute_url(), ) if not has_relation: setattr(source, from_attribute, RelationValue(to_id)) @@ -247,13 +274,17 @@ def create(source=None, target=None, relationship=None): "which is not a relation field. Is this what you wanted? " "Relation points from %s to %s", from_attribute, - source.absolute_url(), - target.absolute_url(), + cast(ImplicitAcquisitionWrapper, source).absolute_url(), + cast(ImplicitAcquisitionWrapper, target).absolute_url(), ) @at_least_one_of("source", "target", "relationship") -def delete(source=None, target=None, relationship=None): +def delete( + source: Optional[ImplicitAcquisitionWrapper] = None, + target: Optional[ImplicitAcquisitionWrapper] = None, + relationship: Optional[str] = None, +): """Delete relation or relations. :param source: Object that the relation originates from. diff --git a/src/plone/api/tests/base.py b/src/plone/api/tests/base.py index a29fcf5c..c68e6f1c 100644 --- a/src/plone/api/tests/base.py +++ b/src/plone/api/tests/base.py @@ -1,5 +1,6 @@ """Base module for unittesting.""" +from Acquisition import ImplicitAcquisitionWrapper from plone.app.testing import FunctionalTesting from plone.app.testing import IntegrationTesting from plone.app.testing import login @@ -8,12 +9,17 @@ from plone.app.testing import setRoles from plone.app.testing import TEST_USER_ID from plone.app.testing import TEST_USER_NAME +from plone.testing.zca import NamedConfigurationMachine class PloneApiLayer(PloneSandboxLayer): defaultBases = (PLONE_FIXTURE,) - def setUpZope(self, app, configurationContext): + def setUpZope( + self, + app: ImplicitAcquisitionWrapper, + configurationContext: NamedConfigurationMachine, + ): """Prepare Zope instance by loading appropriate ZCMLs.""" import plone.app.dexterity @@ -25,7 +31,7 @@ def setUpZope(self, app, configurationContext): self.loadZCML(package=plone.app.contenttypes) - def setUpPloneSite(self, portal): + def setUpPloneSite(self, portal: ImplicitAcquisitionWrapper): """Prepare a Plone instance for testing.""" # Install into Plone site using portal_setup self.applyProfile(portal, "Products.CMFPlone:plone") @@ -38,7 +44,7 @@ def setUpPloneSite(self, portal): setRoles(portal, TEST_USER_ID, ["Manager"]) login(portal, TEST_USER_NAME) - def tearDownZope(self, app): + def tearDownZope(self, app: ImplicitAcquisitionWrapper): """Tear down Zope.""" diff --git a/src/plone/api/tests/test_content.py b/src/plone/api/tests/test_content.py index c19a7337..3c8bbb76 100644 --- a/src/plone/api/tests/test_content.py +++ b/src/plone/api/tests/test_content.py @@ -1,6 +1,7 @@ """Tests for plone.api.content.""" from Acquisition import aq_base +from Acquisition import ImplicitAcquisitionWrapper from OFS.CopySupport import CopyError from OFS.event import ObjectWillBeMovedEvent from OFS.interfaces import IObjectWillBeMovedEvent @@ -914,7 +915,7 @@ def test_delete_with_resolved_internal_breaches(self): self.assertNotIn("blog", self.portal.keys()) self.assertNotIn("training", self.portal["events"].keys()) - def _set_text(self, obj, text): + def _set_text(self, obj: ImplicitAcquisitionWrapper, text: str): obj.text = RichTextValue(text, "text/html", "text/x-html-safe") modified(obj) diff --git a/src/plone/api/tests/test_doctests.py b/src/plone/api/tests/test_doctests.py index 5a94579e..e25b46b6 100644 --- a/src/plone/api/tests/test_doctests.py +++ b/src/plone/api/tests/test_doctests.py @@ -9,8 +9,11 @@ from plone.app.testing import TEST_USER_ID from plone.app.testing import TEST_USER_NAME from plone.app.testing import TEST_USER_PASSWORD +from plone.app.testing.layers import IntegrationTesting from plone.testing import layered from plone.testing.zope import Browser +from typing import Callable +from unittest.suite import TestSuite from zope.testing import renormalizing import doctest @@ -48,7 +51,7 @@ ) -def setUp(self): # pragma: no cover +def setUp(self: manuel.testing.TestCase): # pragma: no cover """Shared test environment set-up, ran before every test.""" layer = self.globs["layer"] # Update global variables within the tests. @@ -79,11 +82,11 @@ def setUp(self): # pragma: no cover def DocFileSuite( - testfile, - flags=FLAGS, - setUp=setUp, - layer=PLONE_INTEGRATION_TESTING, -): + testfile: str, + flags: int = FLAGS, + setUp: Callable = setUp, + layer: IntegrationTesting = PLONE_INTEGRATION_TESTING, +) -> TestSuite: """Returns a test suite configured with a test layer. :param testfile: Path to a doctest file. @@ -114,7 +117,7 @@ def DocFileSuite( ) -def test_suite(): +def test_suite() -> TestSuite: """Find .md files and test code examples in them.""" path = "doctests" doctests = [] diff --git a/src/plone/api/tests/test_env.py b/src/plone/api/tests/test_env.py index c82e4af5..865c0f39 100644 --- a/src/plone/api/tests/test_env.py +++ b/src/plone/api/tests/test_env.py @@ -6,6 +6,7 @@ from plone.api.tests.base import INTEGRATION_TESTING from plone.app.testing import TEST_USER_ID from plone.app.testing import TEST_USER_PASSWORD +from typing import List import AccessControl import unittest @@ -35,7 +36,7 @@ class TestException(Exception): class HasProtectedMethods(SimpleItem): security = AccessControl.ClassSecurityInfo() - def __init__(self, id): + def __init__(self, id: str): self.id = id @security.public @@ -117,11 +118,11 @@ def tearDown(self): """Shared test environment clean-up, ran after every test.""" AccessControl.SecurityManagement.setSecurityManager(self._old_sm) - def should_allow(self, names): + def should_allow(self, names: List[str]): for name in names: self.portal.hpm.restrictedTraverse(name) - def should_forbid(self, names): + def should_forbid(self, names: List[str]): for name in names: with self.assertRaises(Unauthorized): self.portal.hpm.restrictedTraverse(name) diff --git a/src/plone/api/tests/test_portal.py b/src/plone/api/tests/test_portal.py index a822aab5..da2ca4c9 100644 --- a/src/plone/api/tests/test_portal.py +++ b/src/plone/api/tests/test_portal.py @@ -2,6 +2,7 @@ from datetime import date from datetime import datetime +from email import message_from_bytes from packaging import version from plone.api import content from plone.api import env @@ -15,6 +16,7 @@ from Products.CMFPlone.tests.utils import MockMailHost from Products.MailHost.interfaces import IMailHost from unittest import mock +from unittest.mock import MagicMock from zope import schema from zope.component import getUtility from zope.component.hooks import setSite @@ -26,14 +28,6 @@ import unittest -try: - # Python 3 - from email import message_from_bytes -except ImportError: - # Python 2 - from email import message_from_string as message_from_bytes - - HAS_PLONE5 = version.parse(env.plone_version()) >= version.parse("5.0b2") @@ -146,7 +140,7 @@ def test_get_with_sub_site(self): setSite(self.portal) @mock.patch("plone.api.portal.getSite") - def test_get_no_site(self, getSite): + def test_get_no_site(self, getSite: MagicMock): """Test error msg when getSite() returns None.""" getSite.return_value = None from plone.api.exc import CannotGetPortalError @@ -278,7 +272,7 @@ def test_send_email_without_configured_mailhost(self): self.portal.MailHost.smtp_host = old_smtp_host @mock.patch("plone.api.portal.parseaddr") - def test_send_email_parseaddr(self, mock_parseaddr): + def test_send_email_parseaddr(self, mock_parseaddr: MagicMock): """Simulate faulty parsing in parseaddr, from_address should be default email_from_address. """ diff --git a/src/plone/api/tests/test_user.py b/src/plone/api/tests/test_user.py index a7140bdb..1e729a66 100644 --- a/src/plone/api/tests/test_user.py +++ b/src/plone/api/tests/test_user.py @@ -41,7 +41,7 @@ def _check_userid_and_username_different(self): username = user.getUserName() self.assertNotEqual(userid, username) - def _set_emaillogin(self, value): + def _set_emaillogin(self, value: bool): api.portal.set_registry_record("plone.use_email_as_login", value) def test_create_no_email(self): diff --git a/src/plone/api/tests/test_validation.py b/src/plone/api/tests/test_validation.py index b35a9dc0..603001ba 100644 --- a/src/plone/api/tests/test_validation.py +++ b/src/plone/api/tests/test_validation.py @@ -5,11 +5,14 @@ from plone.api.validation import at_least_one_of from plone.api.validation import mutually_exclusive_parameters from plone.api.validation import required_parameters +from typing import Optional import unittest -def undecorated_func(arg1=None, arg2=None, arg3=None): +def undecorated_func( + arg1: Optional[str] = None, arg2: Optional[str] = None, arg3: Optional[str] = None +): return "foo" diff --git a/src/plone/api/user.py b/src/plone/api/user.py index 96a47083..b3923f24 100644 --- a/src/plone/api/user.py +++ b/src/plone/api/user.py @@ -2,6 +2,8 @@ from AccessControl.Permission import getPermissions from AccessControl.SecurityManagement import getSecurityManager +from AccessControl.users import SpecialUser +from Acquisition import ImplicitAcquisitionWrapper from contextlib import contextmanager from plone.api import env from plone.api import portal @@ -14,18 +16,27 @@ from plone.api.validation import required_parameters from Products.CMFPlone.RegistrationTool import get_member_by_login_name from Products.PlonePAS.interfaces.plugins import ILocalRolesPlugin +from Products.PlonePAS.tools.groupdata import GroupData +from Products.PlonePAS.tools.memberdata import MemberData +from typing import cast +from typing import Dict +from typing import Iterator +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union import random import string def create( - email=None, - username=None, - password=None, - roles=("Member",), - properties=None, -): + email: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + roles: Union[List[str], Tuple[str, ...]] = ("Member",), + properties: Optional[Dict[str, str]] = None, +) -> MemberData: """Create a user. :param email: [required] Email for the new user. @@ -66,7 +77,7 @@ def create( ) registration = portal.get_tool("portal_registration") - user_id = use_email_as_username and email or username + user_id = cast(str, use_email_as_username and email or username) # Generate a random 8-char password if not password: @@ -82,12 +93,14 @@ def create( roles, properties=properties, ) - return get(username=user_id) + return cast(MemberData, get(username=user_id)) @mutually_exclusive_parameters("userid", "username") @at_least_one_of("userid", "username") -def get(userid=None, username=None): +def get( + userid: Optional[str] = None, username: Optional[str] = None +) -> Optional[MemberData]: """Get a user. Plone provides both a unique, unchanging identifier for a user (the @@ -121,11 +134,11 @@ def get(userid=None, username=None): ) -def get_current(): +def get_current() -> Union[SpecialUser, MemberData]: """Get the currently logged-in user. :returns: Currently logged-in user - :rtype: MemberData object + :rtype: MemberData object or SpecialUser for anonymous user :Example: :ref:`user-get-current-example` """ portal_membership = portal.get_tool("portal_membership") @@ -133,7 +146,9 @@ def get_current(): @mutually_exclusive_parameters("groupname", "group") -def get_users(groupname=None, group=None): +def get_users( + groupname: Optional[str] = None, group: Optional[GroupData] = None +) -> List[MemberData]: """Get all users or all users filtered by group. Arguments ``group`` and ``groupname`` are mutually exclusive. @@ -166,7 +181,7 @@ def get_users(groupname=None, group=None): @mutually_exclusive_parameters("username", "user") @at_least_one_of("username", "user") -def delete(username=None, user=None): +def delete(username: Optional[str] = None, user: Optional[MemberData] = None): """Delete a user. Arguments ``username`` and ``user`` are mutually exclusive. You can either @@ -182,11 +197,11 @@ def delete(username=None, user=None): :Example: :ref:`user-delete-example` """ portal_membership = portal.get_tool("portal_membership") - user_id = username or user.id + user_id = username or cast(MemberData, user).id portal_membership.deleteMembers((user_id,)) -def is_anonymous(): +def is_anonymous() -> bool: """Check if the currently logged-in user is anonymous. :returns: True if the current user is anonymous, False otherwise. @@ -197,7 +212,12 @@ def is_anonymous(): @mutually_exclusive_parameters("username", "user") -def get_roles(username=None, user=None, obj=None, inherit=True): +def get_roles( + username: Optional[str] = None, + user: Optional[MemberData] = None, + obj: Optional[ImplicitAcquisitionWrapper] = None, + inherit: bool = True, +) -> Union[List[str], Tuple[str, ...]]: """Get user's site-wide or local roles. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -255,13 +275,17 @@ def get_roles(username=None, user=None, obj=None, inherit=True): @contextmanager -def _nop_context_manager(): +def _nop_context_manager() -> Iterator[None]: """Do nothing (trivial context manager).""" yield @mutually_exclusive_parameters("username", "user") -def get_permissions(username=None, user=None, obj=None): +def get_permissions( + username: Optional[str] = None, + user: Optional[MemberData] = None, + obj: Optional[ImplicitAcquisitionWrapper] = None, +) -> Dict[str, bool]: """Get user's site-wide or local permissions. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -296,7 +320,12 @@ def get_permissions(username=None, user=None, obj=None): @mutually_exclusive_parameters("username", "user") -def has_permission(permission, username=None, user=None, obj=None): +def has_permission( + permission: str, + username: Optional[str] = None, + user: Optional[MemberData] = None, + obj: Optional[ImplicitAcquisitionWrapper] = None, +) -> bool: """Check whether this user has the given permission. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -343,7 +372,12 @@ def has_permission(permission, username=None, user=None, obj=None): @required_parameters("roles") @mutually_exclusive_parameters("username", "user") -def grant_roles(username=None, user=None, obj=None, roles=None): +def grant_roles( + username: Optional[str] = None, + user: Optional[MemberData] = None, + obj: Optional[ImplicitAcquisitionWrapper] = None, + roles: Optional[Union[List[str], Tuple[str, ...]]] = None, +): """Grant roles to a user. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -374,8 +408,10 @@ def grant_roles(username=None, user=None, obj=None, roles=None): roles = list(roles) # These roles cannot be granted - if "Anonymous" in roles or "Authenticated" in roles: - raise InvalidParameterError + if "Anonymous" in roles or "Authenticated" in roles or not isinstance(roles, list): # type: ignore + raise InvalidParameterError( + "Roles must be a list of strings and cannot include 'Anonymous' or 'Authenticated'" + ) if obj is None: actual_roles = get_roles(user=user) @@ -392,7 +428,12 @@ def grant_roles(username=None, user=None, obj=None, roles=None): @required_parameters("roles") @mutually_exclusive_parameters("username", "user") -def revoke_roles(username=None, user=None, obj=None, roles=None): +def revoke_roles( + username: Optional[str] = None, + user: Optional[MemberData] = None, + obj: Optional[ImplicitAcquisitionWrapper] = None, + roles: Optional[Union[List[str], Tuple[str, ...]]] = None, +): """Revoke roles from a user. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -418,9 +459,12 @@ def revoke_roles(username=None, user=None, obj=None, roles=None): if user is None: raise InvalidParameterError("User could not be found") - roles = set(roles) + if not isinstance(roles, (list, tuple)): + raise InvalidParameterError("Roles must be a list or a tuple of strings") + + roles_set = set(roles) - if "Anonymous" in roles or "Authenticated" in roles: + if "Anonymous" in roles_set or "Authenticated" in roles_set: raise InvalidParameterError inherit = True @@ -434,7 +478,7 @@ def revoke_roles(username=None, user=None, obj=None, roles=None): if role not in ["Anonymous", "Authenticated"] } - roles = list(actual_roles - roles) + roles = list(actual_roles - roles_set) if obj is None: user.setSecurityProfile(roles=roles) diff --git a/src/plone/api/validation.py b/src/plone/api/validation.py index d8e52b71..adf09144 100644 --- a/src/plone/api/validation.py +++ b/src/plone/api/validation.py @@ -3,11 +3,20 @@ from decorator import decorator from plone.api.exc import InvalidParameterError from plone.api.exc import MissingParameterError +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Tuple +from typing import Union import inspect -def _get_arg_spec(func, validator_args): +def _get_arg_spec( + func: Callable, + validator_args: Tuple[str, ...], +) -> List[str]: """Get the arguments specified in the function spec. and check that the decorator doesn't refer to non-existent args. @@ -26,7 +35,11 @@ def _get_arg_spec(func, validator_args): return signature_args -def _get_supplied_args(signature_params, args, kwargs): +def _get_supplied_args( + signature_params: Union[List[str], Tuple[str, ...]], + args: Any, + kwargs: Dict[str, Any], +) -> List[Any]: """Return names of all args that have been passed in. either as positional or keyword arguments, and are not None. @@ -43,7 +56,7 @@ def _get_supplied_args(signature_params, args, kwargs): return supplied_args -def required_parameters(*required_params): +def required_parameters(*required_params: Tuple[str, ...]) -> Callable: """Test whether all of the specified parameters have been supplied and are not None. Todo: add an optional flag to allow None values through as valid parameters @@ -77,7 +90,7 @@ def wrapped(function, *args, **kwargs): return _required_parameters -def mutually_exclusive_parameters(*exclusive_params): +def mutually_exclusive_parameters(*exclusive_params: Tuple[str, ...]) -> Callable: """Provide decorator. The decorator raises an exception if more than one @@ -113,7 +126,7 @@ def wrapped(function, *args, **kwargs): return _mutually_exclusive_parameters -def at_least_one_of(*candidate_params): +def at_least_one_of(*candidate_params: Tuple[str, ...]) -> Callable: """Provide a decorator. The decorator raises an exception if none of the