Source code for keystone.api.validation.validators

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Internal implementation of request/response validating middleware."""

import re

import jsonschema
from jsonschema import exceptions as jsonschema_exc
from oslo_utils import timeutils
from oslo_utils import uuidutils
import webob.exc

from keystone.common import utils
from keystone import exception
from keystone.i18n import _


def _soft_validate_additional_properties(
    validator,
    additional_properties_value,
    param_value,
    schema,
):
    """Validator function.

    If there are not any properties on the param_value that are not specified
    in the schema, this will return without any effect. If there are any such
    extra properties, they will be handled as follows:

    - if the validator passed to the method is not of type "object", this
      method will return without any effect.
    - if the 'additional_properties_value' parameter is True, this method will
      return without any effect.
    - if the schema has an additionalProperties value of True, the extra
      properties on the param_value will not be touched.
    - if the schema has an additionalProperties value of False and there
      aren't patternProperties specified, the extra properties will be stripped
      from the param_value.
    - if the schema has an additionalProperties value of False and there
      are patternProperties specified, the extra properties will not be
      touched and raise validation error if pattern doesn't match.
    """
    if not (
        validator.is_type(param_value, "object") or additional_properties_value
    ):
        return

    properties = schema.get("properties", {})
    patterns = "|".join(schema.get("patternProperties", {}))
    extra_properties = set()
    for prop in param_value:
        if prop not in properties:
            if patterns:
                if not re.search(patterns, prop):
                    extra_properties.add(prop)
            else:
                extra_properties.add(prop)

    if not extra_properties:
        return

    if patterns:
        error = "Additional properties are not allowed (%s %s unexpected)"
        if len(extra_properties) == 1:
            verb = "was"
        else:
            verb = "were"
        yield jsonschema_exc.ValidationError(
            error
            % (", ".join(repr(extra) for extra in extra_properties), verb)
        )
    else:
        for prop in extra_properties:
            del param_value[prop]


def _validate_string_length(
    value,
    entity_name,
    mandatory=False,
    min_length=0,
    max_length=None,
    remove_whitespaces=False,
):
    """Check the length of specified string.

    :param value: the value of the string
    :param entity_name: the name of the string
    :mandatory: string is mandatory or not
    :param min_length: the min_length of the string
    :param max_length: the max_length of the string
    :param remove_whitespaces: True if trimming whitespaces is needed else
        False
    """
    if not mandatory and not value:
        return True

    if mandatory and not value:
        msg = _("The '%s' can not be None.") % entity_name
        raise webob.exc.HTTPBadRequest(explanation=msg)

    if remove_whitespaces:
        value = value.strip()

    utils.check_string_length(
        value, entity_name, min_length=min_length, max_length=max_length
    )


@jsonschema.FormatChecker.cls_checks("date-time")
def _validate_datetime_format(instance: object) -> bool:
    # format checks constrain to the relevant primitive type
    # https://github.com/OAI/OpenAPI-Specification/issues/3148
    if not isinstance(instance, str):
        return True
    try:
        timeutils.parse_isotime(instance)
    except ValueError:
        return False
    else:
        return True


@jsonschema.FormatChecker.cls_checks("uuid")
def _validate_uuid_format(instance: object) -> bool:
    # format checks constrain to the relevant primitive type
    # https://github.com/OAI/OpenAPI-Specification/issues/3148
    if not isinstance(instance, str):
        return True

    return uuidutils.is_uuid_like(instance)


[docs] class FormatChecker(jsonschema.FormatChecker): """A FormatChecker can output the message from cause exception. We need understandable validation errors messages for users. When a custom checker has an exception, the FormatChecker will output a readable message provided by the checker. """
[docs] def check(self, param_value, format): """Check whether the param_value conforms to the given format. :param param_value: the param_value to check :type: any primitive type (str, number, bool) :param str format: the format that param_value should conform to :raises: :exc:`FormatError` if param_value does not conform to format """ if format not in self.checkers: return # For safety reasons custom checkers can be registered with # allowed exception types. Anything else will fall into the # default formatter. func, raises = self.checkers[format] result, cause = None, None try: result = func(param_value) except raises as e: cause = e if not result: msg = f"{param_value!r} is not a {format!r}" raise jsonschema_exc.FormatError(msg, cause=cause)
class _SchemaValidator: """A validator class. This class is changed from Draft202012Validator to validate minimum/maximum value of a string number(e.g. '10'). In addition, FormatCheckers are added for checking data formats which are common in the Manila API. """ validator = None validator_org = jsonschema.Draft202012Validator def __init__( self, schema, relax_additional_properties=False, is_body=True ): self.is_body = is_body validators = { "minimum": self._validate_minimum, "maximum": self._validate_maximum, } if relax_additional_properties: validators["additionalProperties"] = ( _soft_validate_additional_properties ) validator_cls = jsonschema.validators.extend( self.validator_org, validators ) format_checker = FormatChecker() self.validator = validator_cls(schema, format_checker=format_checker) def validate(self, *args, **kwargs): try: self.validator.validate(*args, **kwargs) except jsonschema.ValidationError as ex: if len(ex.path) > 0: if self.is_body: # NOTE: For consistency across OpenStack services, this # error message has been written in a similar format as # WSME errors. detail = _( "Invalid input for field/attribute %(path)s. " "Value: %(value)s. %(message)s" ) % { "path": ex.path.pop(), "value": ex.instance, "message": ex.message, } else: # NOTE: We use 'ex.path.popleft()' instead of # 'ex.path.pop()'. This is due to the structure of query # parameters which is a dict with key as name and value is # list. As such, the first item in the 'ex.path' is the key # and second item is the index of list in the value. We # need the key as the parameter name in the error message # so we pop the first value out of 'ex.path'. detail = _( "Invalid input for query parameters %(path)s. " "Value: %(value)s. %(message)s" ) % { "path": ex.path.popleft(), "value": ex.instance, "message": ex.message, } else: detail = ex.message raise exception.SchemaValidationError(detail=detail) except TypeError as ex: # NOTE: If passing non string value to patternProperties parameter, # TypeError happens. Here is for catching the TypeError. detail = str(ex) raise exception.SchemaValidationError(detail=detail) def _number_from_str(self, param_value): try: value = int(param_value) except (ValueError, TypeError): try: value = float(param_value) except (ValueError, TypeError): return None return value def _validate_minimum(self, validator, minimum, param_value, schema): param_value = self._number_from_str(param_value) if param_value is None: return return self.validator_org.VALIDATORS["minimum"]( validator, minimum, param_value, schema ) def _validate_maximum(self, validator, maximum, param_value, schema): param_value = self._number_from_str(param_value) if param_value is None: return return self.validator_org.VALIDATORS["maximum"]( validator, maximum, param_value, schema )