import asyncio
import logging
from datetime import datetime
from typing import IO, Iterable, List, Optional, Tuple, Type, TypeVar, Union

from asn1crypto import cms, core, tsp, x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from pyhanko_certvalidator import (
    CancelableAsyncIterator,
    ValidationContext,
    find_valid_path,
)
from pyhanko_certvalidator.errors import (
    DisallowedAlgorithmError,
    ExpiredError,
    InvalidCertificateError,
    PathBuildingError,
    PathValidationError,
    RevokedError,
    ValidationError,
)
from pyhanko_certvalidator.ltv.errors import TimeSlideFailure
from pyhanko_certvalidator.path import ValidationPath
from pyhanko_certvalidator.policy_decl import (
    AlgorithmUsagePolicy,
    PKIXValidationParams,
)
from pyhanko_certvalidator.validate import ACValidationResult, async_validate_ac

from pyhanko.sign.general import (
    CMSExtractionError,
    CMSStructuralError,
    MultivaluedAttributeError,
    NonexistentAttributeError,
    SignedDataCerts,
    check_ess_certid,
    extract_certificate_info,
    extract_signer_info,
    find_unique_cms_attribute,
    get_pyca_cryptography_hash,
)

from ...pdf_utils import misc
from ..ades.report import AdESFailure, AdESIndeterminate
from . import errors
from .settings import KeyUsageConstraints
from .status import (
    CAdESSignerAttributeAssertions,
    CertifiedAttributes,
    ClaimedAttributes,
    RevocationDetails,
    SignatureStatus,
    StandardCMSSignatureStatus,
    TimestampSignatureStatus,
)
from .utils import (
    DEFAULT_ALGORITHM_USAGE_POLICY,
    extract_message_digest,
    validate_raw,
)

__all__ = [
    'validate_sig_integrity', 'async_validate_cms_signature',
    'collect_timing_info', 'validate_tst_signed_data',
    'async_validate_detached_cms', 'cms_basic_validation',
    'compute_signature_tst_digest', 'extract_tst_data',
    'extract_self_reported_ts', 'extract_certs_for_validation',
    'collect_signer_attr_status', 'validate_algorithm_protection'
]

from ...pdf_utils.misc import lift_iterable_async

logger = logging.getLogger(__name__)

StatusType = TypeVar('StatusType', bound=SignatureStatus)


def _grab_signing_cert_attr(signed_attrs, v2: bool):
    # TODO check certificate policies, enforce restrictions on chain of trust
    # TODO document and/or mark as internal API explicitly
    attr_name = 'signing_certificate_v2' if v2 else 'signing_certificate'
    cls = tsp.SigningCertificateV2 if v2 else tsp.SigningCertificate
    try:
        value = find_unique_cms_attribute(signed_attrs, attr_name)
        # reencode the attribute to avoid accidentally tripping the
        # _is_mutated logic on the parent object (is important to preserve
        # the state of the signed attributes)
        return cls.load(value.dump())
    except NonexistentAttributeError:
        return None
    except MultivaluedAttributeError as e:
        # Banned by RFCs -> error
        err = AdESIndeterminate.NO_SIGNING_CERTIFICATE_FOUND
        raise errors.SignatureValidationError(
            "Wrong cardinality for signing certificate attribute",
            ades_subindication=err
        ) from e


def _check_signing_certificate(cert: x509.Certificate,
                               signed_attrs: cms.CMSAttributes):
    # TODO check certificate policies, enforce restrictions on chain of trust
    # TODO document and/or mark as internal API explicitly

    attr = _grab_signing_cert_attr(signed_attrs, v2=True)
    if attr is None:
        attr = _grab_signing_cert_attr(signed_attrs, v2=False)

    if attr is None:
        # if neither attr is present -> no constraints
        return

    # For the main signer cert, we only care about the first value, the others
    # limit the set of applicable CA certs
    certid = attr['certs'][0]

    if not check_ess_certid(cert, certid):
        err = AdESIndeterminate.NO_SIGNING_CERTIFICATE_FOUND
        raise errors.SignatureValidationError(
            f"Signing certificate attribute does not match selected "
            f"signer's certificate for subject"
            f"\"{cert.subject.human_friendly}\".",
            ades_subindication=err
        )


def validate_algorithm_protection(
        attrs: cms.CMSAttributes,
        claimed_digest_algorithm_obj: cms.DigestAlgorithm,
        claimed_signature_algorithm_obj: cms.SignedDigestAlgorithm):
    """
    Internal API to validate the CMS algorithm protection attribute
    defined in :rfc:`6211`, if present.

    :param attrs:
        A CMS attribute list.
    :param claimed_digest_algorithm_obj:
        The claimed (i.e. unprotected) digest algorithm value.
    :param claimed_signature_algorithm_obj:
        The claimed (i.e. unprotected) signature algorithm value.
    :raises errors.CMSStructuralError:
        if multiple CMS protection attributes are present
    :raises errors.CMSAlgorithmProtectionError:
        if a mismatch is detected
    """

    try:
        cms_algid_protection = find_unique_cms_attribute(
            attrs, 'cms_algorithm_protection'
        )
    except NonexistentAttributeError:
        # TODO make this optional to enforce?
        cms_algid_protection = None
    except MultivaluedAttributeError:
        raise CMSStructuralError(
            "Multiple CMS algorithm protection attributes present",
        )
    if cms_algid_protection is not None:
        auth_digest_algorithm = \
            cms_algid_protection['digest_algorithm'].native
        if auth_digest_algorithm != claimed_digest_algorithm_obj.native:
            raise errors.CMSAlgorithmProtectionError(
                "Digest algorithm does not match CMS algorithm protection "
                "attribute.",
            )
        signed_sig_algorithm = \
            cms_algid_protection['signature_algorithm'].native
        if signed_sig_algorithm is None:
            raise errors.CMSAlgorithmProtectionError(
                "CMS algorithm protection attribute not valid for signed "
                "data",
            )
        elif signed_sig_algorithm != claimed_signature_algorithm_obj.native:
            raise errors.CMSAlgorithmProtectionError(
                "Signature mechanism does not match CMS algorithm "
                "protection attribute.",
            )


def validate_sig_integrity(
        signer_info: cms.SignerInfo,
        cert: x509.Certificate,
        expected_content_type: str,
        actual_digest: bytes,
        algorithm_usage_policy: Optional[AlgorithmUsagePolicy] = None,
        time_indic: Optional[datetime] = None) \
        -> Tuple[bool, bool]:
    """
    Validate the integrity of a signature for a particular signerInfo object
    inside a CMS signed data container.

    .. warning::
        This function does not do any trust checks, and is considered
        "dangerous" API because it is easy to misuse.

    :param signer_info:
        A :class:`cms.SignerInfo` object.
    :param cert:
        The signer's certificate.

        .. note::
            This function will not attempt to extract certificates from
            the signed data.
    :param expected_content_type:
        The expected value for the content type attribute (as a Python string,
        see :class:`cms.ContentType`).
    :param actual_digest:
        The actual digest to be matched to the message digest attribute.
    :param algorithm_usage_policy:
        Algorithm usage policy.
    :param time_indic:
        Time indication for the production of the signature.
    :return:
        A tuple of two booleans. The first indicates whether the provided
        digest matches the value in the signed attributes.
        The second indicates whether the signature of the digest is valid.
    """

    signature_algorithm: cms.SignedDigestAlgorithm = \
        signer_info['signature_algorithm']
    digest_algorithm_obj = signer_info['digest_algorithm']
    md_algorithm = digest_algorithm_obj['algorithm'].native
    if algorithm_usage_policy is not None:
        algo_allowed = algorithm_usage_policy.signature_algorithm_allowed(
            signature_algorithm,
            moment=time_indic,
            public_key=cert.public_key
        )
        if not algo_allowed:
            msg = (
                f"The algorithm {signature_algorithm['algorithm'].native} "
                f"is not allowed by the current usage policy."
            )
            if algo_allowed.failure_reason is not None:
                msg += f" Reason: {algo_allowed.failure_reason}."
            raise errors.DisallowedAlgorithmError(
                msg, permanent=algo_allowed.not_allowed_after is None
            )
    signature = signer_info['signature'].native

    signed_attrs_orig: cms.CMSAttributes = signer_info['signed_attrs']

    if signed_attrs_orig is core.VOID:
        embedded_digest = None
        prehashed = True
        signed_data = actual_digest
    else:
        # signed_attrs comes with context-specific tagging.
        # We need to re-tag it with a universal SET OF tag.
        signed_attrs = signer_info['signed_attrs'].untag()
        # do this ASAP to minimise the chances of accidentally disturbing
        # the state. We want to tolerate inconsequential deviations from DER,
        # even though CMS mandates strict adherence to DER (not all signers
        # follow that rule)
        # TODO offer a mode with ultra-strict adherence to DER where we call
        #  dump(force=True) here. That requires changes to asn1crypto, though,
        #  since it is too eager to mess with URI values in ways that go beyond
        #  DER.
        signed_data = signed_attrs.dump()
        prehashed = False
        # check the CMSAlgorithmProtection attr, if present
        try:
            validate_algorithm_protection(
                signed_attrs,
                claimed_digest_algorithm_obj=digest_algorithm_obj,
                claimed_signature_algorithm_obj=signature_algorithm
            )
        except CMSStructuralError as e:
            raise errors.SignatureValidationError(
                e.failure_message, ades_subindication=AdESFailure.FORMAT_FAILURE
            )
        except errors.CMSAlgorithmProtectionError as e:
            raise errors.SignatureValidationError(
                e.failure_message,
                # these are conceptually failures, but AdES doesn't have
                # them in its validation model, so 'GENERIC' it is.
                #  (same applies to other such cases)
                ades_subindication=AdESIndeterminate.GENERIC
            )

        # check the signing-certificate or signing-certificate-v2 attr
        # Note: Through the usual "full validation" call path, this check is
        #   performed twice. AdES requires the check to be performed when
        #   selecting the signer's certificate (which happens elsewhere), but
        #   we keep this check for compatibility for those cases where
        #   validate_sig_integrity is used standalone.
        _check_signing_certificate(cert, signed_attrs)

        try:
            content_type = find_unique_cms_attribute(
                signed_attrs, 'content_type'
            )
        except (NonexistentAttributeError, MultivaluedAttributeError):
            raise errors.SignatureValidationError(
                'Content type not found in signature, or multiple content-type '
                'attributes present.',
                ades_subindication=AdESFailure.FORMAT_FAILURE
            )
        content_type = content_type.native
        if content_type != expected_content_type:
            raise errors.SignatureValidationError(
                f'Content type {content_type} did not match expected value '
                f'{expected_content_type}',
                ades_subindication=AdESFailure.FORMAT_FAILURE
            )

        embedded_digest = extract_message_digest(signer_info)

    try:
        validate_raw(
            signature, signed_data, cert, signature_algorithm, md_algorithm,
            prehashed=prehashed, algorithm_policy=algorithm_usage_policy,
            time_indic=time_indic
        )
        valid = True
    except InvalidSignature:
        valid = False

    intact = (
        actual_digest == embedded_digest
        if embedded_digest is not None else valid
    )

    return intact, valid


def extract_certs_for_validation(signed_data: cms.SignedData) \
        -> SignedDataCerts:
    """
    Extract certificates from a CMS signed data object for validation purposes,
    identifying the signer's certificate in accordance with ETSI EN 319 102-1,
    5.2.3.4.

    :param signed_data:
        The CMS payload.
    :return:
        The extracted certificates.
    """

    # TODO allow signer certificate to be obtained from elsewhere?

    try:
        cert_info = extract_certificate_info(signed_data)
        cert = cert_info.signer_cert
    except CMSExtractionError:
        raise errors.SignatureValidationError(
            'signer certificate not included in signature',
            ades_subindication=AdESIndeterminate.NO_SIGNING_CERTIFICATE_FOUND
        )
    signer_info = extract_signer_info(signed_data)
    signed_attrs = signer_info['signed_attrs']
    # check the signing-certificate or signing-certificate-v2 attr
    _check_signing_certificate(cert, signed_attrs)
    return cert_info


async def cms_basic_validation(
        signed_data: cms.SignedData,
        raw_digest: bytes = None,
        validation_context: Optional[ValidationContext] = None,
        status_kwargs: dict = None,
        validation_path: Optional[ValidationPath] = None,
        pkix_validation_params: Optional[PKIXValidationParams] = None,
        *, key_usage_settings: KeyUsageConstraints):
    """
    Perform basic validation of CMS and PKCS#7 signatures in isolation
    (i.e. integrity and trust checks).

    Internal API.
    """
    signer_info = extract_signer_info(signed_data)
    cert_info = extract_certs_for_validation(signed_data)
    cert = cert_info.signer_cert
    other_certs = cert_info.other_certs

    algorithm_policy = None
    time_indic = None
    if validation_context is not None:
        algorithm_policy = validation_context.algorithm_policy
        time_indic = validation_context.best_signature_time
    validation_context = validation_context or ValidationContext()
    if algorithm_policy is None:
        algorithm_policy = DEFAULT_ALGORITHM_USAGE_POLICY

    signature_algorithm: cms.SignedDigestAlgorithm = \
        signer_info['signature_algorithm']
    mechanism = signature_algorithm['algorithm'].native
    md_algorithm = signer_info['digest_algorithm']['algorithm'].native
    eci = signed_data['encap_content_info']
    expected_content_type = eci['content_type'].native
    if raw_digest is None:
        # this means that there should be encapsulated data
        raw = bytes(eci['content'])
        md_spec = get_pyca_cryptography_hash(md_algorithm)
        md = hashes.Hash(md_spec)
        md.update(raw)
        raw_digest = md.finalize()

    # first, do the cryptographic identity checks
    # TODO theoretically (e.g. DSA with param inheritance) this requires
    #  doing the X.509 validation step first. Since nobody cares about DSA
    #  (let alone DSA with inherited parameters), that's just a "nice to have".
    try:
        intact, valid = validate_sig_integrity(
            signer_info, cert, expected_content_type=expected_content_type,
            actual_digest=raw_digest, algorithm_usage_policy=algorithm_policy,
            time_indic=time_indic
        )
    except CMSStructuralError as e:
        raise errors.SignatureValidationError(
            "CMS structural error: " + e.failure_message,
            ades_subindication=AdESFailure.FORMAT_FAILURE
        ) from e

    # next, validate trust
    ades_status = path = revo_details = None
    if valid:
        try:
            validation_context.certificate_registry\
                .register_multiple(other_certs)

            if validation_path is not None:
                paths = lift_iterable_async([validation_path])
            else:
                paths = validation_context.path_builder\
                    .async_build_paths_lazy(cert)

            ades_status, revo_details, path = await validate_cert_usage(
                cert, validation_context,
                key_usage_settings=key_usage_settings,
                paths=paths, pkix_validation_params=pkix_validation_params,
            )
        except ValueError as e:
            logger.error("Processing error in validation process", exc_info=e)
            ades_status = AdESIndeterminate.CERTIFICATE_CHAIN_GENERAL_FAILURE

    status_kwargs = status_kwargs or {}
    status_kwargs.update(
        intact=intact, valid=valid, signing_cert=cert,
        md_algorithm=md_algorithm, pkcs7_signature_mechanism=mechanism,
        trust_problem_indic=ades_status, validation_path=path,
        revocation_details=revo_details
    )
    return status_kwargs


async def validate_cert_usage(
        cert: x509.Certificate,
        validation_context: ValidationContext,
        key_usage_settings: KeyUsageConstraints,
        paths: CancelableAsyncIterator[ValidationPath],
        pkix_validation_params: Optional[PKIXValidationParams] = None):
    """
    Low-level certificate validation routine.
    Internal API.
    """

    async def _check():
        key_usage_settings.validate(cert)
        return await find_valid_path(
            cert, paths,
            validation_context=validation_context,
            pkix_validation_params=pkix_validation_params
        )
    # validate usage without going through pyhanko_certvalidator
    ades_status, revo_details, path = \
        await handle_certvalidator_errors(_check())
    if ades_status is not None:
        subj = cert.subject.human_friendly
        logger.warning(f"Chain of trust validation for {subj} failed.")
    return ades_status, revo_details, path


async def async_validate_cms_signature(
                           signed_data: cms.SignedData,
                           status_cls: Type[StatusType] = SignatureStatus,
                           raw_digest: bytes = None,
                           validation_context: ValidationContext = None,
                           status_kwargs: dict = None,
                           key_usage_settings: KeyUsageConstraints = None):
    """
    Validate a CMS signature (i.e. a ``SignedData`` object).

    :param signed_data:
        The :class:`.asn1crypto.cms.SignedData` object to validate.
    :param status_cls:
        Status class to use for the validation result.
    :param raw_digest:
        Raw digest, computed from context.
    :param validation_context:
        Validation context to validate the signer's certificate.
    :param status_kwargs:
        Other keyword arguments to pass to the ``status_class`` when reporting
        validation results.
    :param key_usage_settings:
        A :class:`.KeyUsageConstraints` object specifying which key usages
        must or must not be present in the signer's certificate.
    :return:
        A :class:`.SignatureStatus` object (or an instance of a proper subclass)
    """
    key_usage_settings = \
        status_cls.default_usage_constraints(key_usage_settings)
    status_kwargs = await cms_basic_validation(
        signed_data, raw_digest, validation_context,
        status_kwargs, key_usage_settings=key_usage_settings
    )
    return status_cls(**status_kwargs)


def extract_self_reported_ts(signer_info: cms.SignerInfo) -> Optional[datetime]:
    """
    Extract self-reported timestamp (from the ``signingTime`` attribute)

    Internal API.

    :param signer_info:
        A ``SignerInfo`` value.
    :return:
        The value of the ``signingTime`` attribute as a ``datetime``, or
        ``None``.
    """
    try:
        sa = signer_info['signed_attrs']
        st = find_unique_cms_attribute(sa, 'signing_time')
        return st.native
    except (NonexistentAttributeError, MultivaluedAttributeError):
        pass


def extract_tst_data(signer_info: cms.SignerInfo, signed: bool = False) \
        -> Optional[cms.SignedData]:
    """
    Extract signed data associated with a timestamp token.

    Internal API.

    :param signer_info:
        A ``SignerInfo`` value.
    :param signed:
        If ``True``, look for a content timestamp (among the signed
        attributes), else look for a signature timestamp (among the unsigned
        attributes).
    :return:
        The ``SignedData`` value found, or ``None``.
    """
    try:
        if signed:
            sa = signer_info['signed_attrs']
            tst = find_unique_cms_attribute(sa, 'content_time_stamp')
        else:
            ua = signer_info['unsigned_attrs']
            tst = find_unique_cms_attribute(ua, 'signature_time_stamp_token')
        tst_signed_data = tst['content']
        return tst_signed_data
    except (NonexistentAttributeError, MultivaluedAttributeError):
        pass


def compute_signature_tst_digest(signer_info: cms.SignerInfo) \
        -> Optional[bytes]:
    """
    Compute the digest of the signature according to the message imprint
    algorithm information in a signature timestamp token.

    Internal API.

    :param signer_info:
        A ``SignerInfo`` value.
    :return:
        The computed digest, or ``None`` if there is no signature timestamp.
    """

    tst_data = extract_tst_data(signer_info)
    if tst_data is None:
        return None

    eci = tst_data['encap_content_info']
    mi = eci['content'].parsed['message_imprint']
    tst_md_algorithm = mi['hash_algorithm']['algorithm'].native

    signature_bytes = signer_info['signature'].native
    tst_md_spec = get_pyca_cryptography_hash(tst_md_algorithm)
    md = hashes.Hash(tst_md_spec)
    md.update(signature_bytes)
    return md.finalize()

# TODO support signerInfo with multivalued timestamp attributes


async def collect_timing_info(signer_info: cms.SignerInfo,
                              ts_validation_context: ValidationContext,
                              raw_digest: bytes):
    """
    Collect and validate timing information in a ``SignerInfo`` value.
    This includes the ``signingTime`` attribute, content timestamp information
    and signature timestamp information.

    :param signer_info:
        A ``SignerInfo`` value.
    :param ts_validation_context:
        The timestamp validation context to validate against.
    :param raw_digest:
        The raw external message digest bytes (only relevant for the
        validation of the content timestamp token, if there is one)
    """

    status_kwargs = {}

    # timestamp-related validation
    signer_reported_dt = extract_self_reported_ts(signer_info)
    if signer_reported_dt is not None:
        status_kwargs['signer_reported_dt'] = signer_reported_dt

    tst_signed_data = extract_tst_data(signer_info, signed=False)
    if tst_signed_data is not None:
        tst_validity_kwargs = await validate_tst_signed_data(
            tst_signed_data, ts_validation_context,
            compute_signature_tst_digest(signer_info),
        )
        tst_validity = TimestampSignatureStatus(**tst_validity_kwargs)
        status_kwargs['timestamp_validity'] = tst_validity

    content_tst_signed_data = extract_tst_data(signer_info, signed=True)
    if content_tst_signed_data is not None:
        content_tst_validity_kwargs = await validate_tst_signed_data(
            content_tst_signed_data, ts_validation_context,
            expected_tst_imprint=raw_digest
        )
        content_tst_validity = TimestampSignatureStatus(
            **content_tst_validity_kwargs
        )
        status_kwargs['content_timestamp_validity'] = content_tst_validity

    return status_kwargs


async def validate_tst_signed_data(
        tst_signed_data: cms.SignedData,
        validation_context: Optional[ValidationContext],
        expected_tst_imprint: bytes):
    """
    Validate the ``SignedData`` of a time stamp token.

    :param tst_signed_data:
        The ``SignedData`` value to validate; must encapsulate a ``TSTInfo``
        value.
    :param validation_context:
        The validation context to validate against.
    :param expected_tst_imprint:
        The expected message imprint value that should be contained in
        the encapsulated ``TSTInfo``.
    :return:
        Keyword arguments for a :class:`.TimeStampSignatureStatus`.
    """

    tst_info = None
    tst_info_bytes = tst_signed_data['encap_content_info']['content']
    if isinstance(tst_info_bytes, core.ParsableOctetString):
        tst_info = tst_info_bytes.parsed
    if not isinstance(tst_info, tsp.TSTInfo):
        raise errors.SignatureValidationError(
            "SignedData does not encapsulate TSTInfo",
            ades_subindication=AdESFailure.FORMAT_FAILURE
        )
    timestamp = tst_info['gen_time'].native

    ku_settings = TimestampSignatureStatus.default_usage_constraints()
    status_kwargs = await cms_basic_validation(
        tst_signed_data, validation_context=validation_context,
        status_kwargs={'timestamp': timestamp},
        key_usage_settings=ku_settings
    )
    # compare the expected TST digest against the message imprint
    # inside the signed data
    tst_imprint = tst_info['message_imprint']['hashed_message'].native
    if expected_tst_imprint != tst_imprint:
        logger.warning(
            f"Timestamp token imprint is {tst_imprint.hex()}, but expected "
            f"{expected_tst_imprint.hex()}."
        )
        status_kwargs['intact'] = False
    return status_kwargs


async def process_certified_attrs(
        acs: Iterable[cms.AttributeCertificateV2],
        signer_cert: x509.Certificate,
        validation_context: ValidationContext) \
        -> Tuple[List[ACValidationResult], List[Exception]]:
    jobs = [
        async_validate_ac(ac, validation_context, holder_cert=signer_cert)
        for ac in acs
    ]
    results = []
    errors = []
    for job in asyncio.as_completed(jobs):
        try:
            results.append(await job)
        except (PathBuildingError, PathValidationError) as e:
            errors.append(e)
    return results, errors


async def collect_signer_attr_status(
        sd_attr_certificates: Iterable[cms.AttributeCertificateV2],
        signer_cert: x509.Certificate,
        validation_context: Optional[ValidationContext],
        sd_signed_attrs: cms.CMSAttributes):
    # check if we need to process signer-attrs-v2 first
    try:
        signer_attrs = \
            find_unique_cms_attribute(sd_signed_attrs, 'signer_attributes_v2')
    except NonexistentAttributeError:
        signer_attrs = None
    except MultivaluedAttributeError as e:
        # TODO downgrade to a warning?
        raise errors.SignatureValidationError(
            str(e), ades_subindication=AdESFailure.FORMAT_FAILURE
        ) from e

    result = {}
    cades_ac_results = None
    cades_ac_errors = None
    if signer_attrs is not None:
        claimed_asn1 = signer_attrs['claimed_attributes']
        # process claimed attributes (no verification possible/required,
        # so this is independent of whether we have a validation context
        # available)
        # TODO offer a strict mode where all attributes must be recognised
        #  and/or at least parseable?
        claimed = ClaimedAttributes.from_iterable(
            claimed_asn1 if not isinstance(claimed_asn1, core.Void) else ()
        )
        # extract all X.509 attribute certs
        certified_asn1 = signer_attrs['certified_attributes_v2']
        unknown_cert_attrs = False
        if not isinstance(certified_asn1, core.Void):
            # if there are certified attributes but validation_context is None,
            # then cades_ac_results remains None
            cades_acs = [
                attr.chosen for attr in certified_asn1
                if attr.name == 'attr_cert'
            ]
            # record if there were other types of certified attributes
            unknown_cert_attrs = len(cades_acs) != len(certified_asn1)
            if validation_context is not None:
                # validate retrieved AC's
                val_job = process_certified_attrs(
                    cades_acs, signer_cert, validation_context,
                )
                cades_ac_results, cades_ac_errors = await val_job

        # If we were able to validate AC's from the signers-attrs-v2 attribute,
        # compile the validation results
        if cades_ac_results is not None:
            # TODO offer a strict mode where all attributes must be recognised
            #  and/or at least parseable?
            certified = CertifiedAttributes.from_results(cades_ac_results)
        else:
            certified = None

        # If there's a validation context (i.e. the caller cares about attribute
        #  validation semantics), then log a warning message in case there were
        # signed assertions or certified attributes that we didn't understand.
        unknown_attrs = (
            unknown_cert_attrs or
            not isinstance(signer_attrs['signed_assertions'], core.Void)
        )
        if validation_context is not None and unknown_attrs:
            logger.warning(
                "CAdES signer attributes with externally certified assertions "
                "for which no validation method is available. This may affect "
                "signature semantics in unexpected ways."
            )

        # store the result of the signer-attrs-v2 processing step
        result['cades_signer_attrs'] = CAdESSignerAttributeAssertions(
            claimed_attrs=claimed, certified_attrs=certified,
            ac_validation_errs=cades_ac_errors,
            unknown_attrs_present=unknown_attrs
        )

    if validation_context is not None:
        # validate the ac's in the SD's 'certificates' entry, we have to do that
        # anyway
        ac_results, ac_errors = await process_certified_attrs(
            sd_attr_certificates, signer_cert, validation_context
        )
        # if there were validation results from the signer-attrs-v2 validation,
        # add them to the report here.
        if cades_ac_results:
            ac_results.extend(cades_ac_results)
        if cades_ac_errors:
            ac_errors.extend(cades_ac_errors)
        result['ac_attrs'] = CertifiedAttributes.from_results(ac_results)
        result['ac_validation_errs'] = ac_errors
    return result


async def async_validate_detached_cms(
        input_data: Union[bytes, IO,
                          cms.ContentInfo, cms.EncapsulatedContentInfo],
        signed_data: cms.SignedData,
        signer_validation_context: ValidationContext = None,
        ts_validation_context: ValidationContext = None,
        ac_validation_context: ValidationContext = None,
        key_usage_settings: KeyUsageConstraints = None,
        chunk_size=misc.DEFAULT_CHUNK_SIZE,
        max_read=None) -> StandardCMSSignatureStatus:
    """
    .. versionadded: 0.9.0

    .. versionchanged: 0.11.0
        Added ``ac_validation_context`` param.

    Validate a detached CMS signature.

    :param input_data:
        The input data to sign. This can be either a :class:`bytes` object,
        a file-like object or a :class:`cms.ContentInfo` /
        :class:`cms.EncapsulatedContentInfo` object.

        If a CMS content info object is passed in, the `content` field
        will be extracted.
    :param signed_data:
        The :class:`cms.SignedData` object containing the signature to verify.
    :param signer_validation_context:
        Validation context to use to verify the signer certificate's trust.
    :param ts_validation_context:
        Validation context to use to verify the TSA certificate's trust, if
        a timestamp token is present.
        By default, the same validation context as that of the signer is used.
    :param ac_validation_context:
        Validation context to use to validate attribute certificates.
        If not supplied, no AC validation will be performed.

        .. note::
            :rfc:`5755` requires attribute authority trust roots to be specified
            explicitly; hence why there's no default.
    :param key_usage_settings:
        Key usage parameters for the signer.
    :param chunk_size:
        Chunk size to use when consuming input data.
    :param max_read:
        Maximal number of bytes to read from the input stream.
    :return:
        A description of the signature's status.
    """

    if ts_validation_context is None:
        ts_validation_context = signer_validation_context
    signer_info = extract_signer_info(signed_data)
    digest_algorithm = signer_info['digest_algorithm']['algorithm'].native
    h = hashes.Hash(get_pyca_cryptography_hash(digest_algorithm))
    if isinstance(input_data, bytes):
        h.update(input_data)
    elif isinstance(input_data, (cms.ContentInfo, cms.EncapsulatedContentInfo)):
        h.update(bytes(input_data['content']))
    else:
        temp_buf = bytearray(chunk_size)
        misc.chunked_digest(temp_buf, input_data, h, max_read=max_read)
    digest_bytes = h.finalize()

    status_kwargs = await collect_timing_info(
        signer_info, ts_validation_context=ts_validation_context,
        raw_digest=digest_bytes
    )
    key_usage_settings = \
        StandardCMSSignatureStatus.default_usage_constraints(key_usage_settings)
    status_kwargs = await cms_basic_validation(
        signed_data, raw_digest=digest_bytes,
        validation_context=signer_validation_context,
        status_kwargs=status_kwargs,
        key_usage_settings=key_usage_settings
    )
    cert_info = extract_certificate_info(signed_data)
    if ac_validation_context is not None:
        ac_validation_context.certificate_registry.register_multiple(
            cert_info.other_certs
        )
    status_kwargs.update(
        await collect_signer_attr_status(
            sd_attr_certificates=cert_info.attribute_certs,
            signer_cert=cert_info.signer_cert,
            validation_context=ac_validation_context,
            sd_signed_attrs=signer_info['signed_attrs']
        )
    )
    return StandardCMSSignatureStatus(**status_kwargs)


async def handle_certvalidator_errors(coro):
    """
    Internal error handling function that maps certvalidator errors
    to AdES status indications.

    :param coro:
    :return:
    """
    revo_details = None
    try:
        return None, None, await coro
    except InvalidCertificateError as e:
        logger.warning(e.failure_msg, exc_info=e)
        ades_status = AdESIndeterminate.CHAIN_CONSTRAINTS_FAILURE
    except TimeSlideFailure as e:
        logger.warning(e.failure_msg, exc_info=e)
        ades_status = AdESIndeterminate.NO_POE
    except DisallowedAlgorithmError as e:
        if e.banned_since is None:
            # permaban
            ades_status = AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE
        else:
            # could get resolved with more POEs
            ades_status = AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE
    except RevokedError as e:
        logger.warning(e.failure_msg)
        if e.is_side_validation:
            # don't report this as a revocation event
            ades_status = AdESIndeterminate.CERTIFICATE_CHAIN_GENERAL_FAILURE
        elif e.is_ee_cert:
            ades_status = AdESIndeterminate.REVOKED_NO_POE
            revo_details = RevocationDetails(
                ca_revoked=False, revocation_date=e.revocation_dt,
                revocation_reason=e.reason
            )
        else:
            ades_status = AdESIndeterminate.REVOKED_CA_NO_POE
            revo_details = RevocationDetails(
                ca_revoked=True, revocation_date=e.revocation_dt,
                revocation_reason=e.reason
            )
    except PathBuildingError as e:
        logger.warning("Failed to build path", exc_info=e)
        ades_status = AdESIndeterminate.NO_CERTIFICATE_CHAIN_FOUND
    except ExpiredError as e:
        logger.warning(e.failure_msg)
        if not e.is_side_validation and e.is_ee_cert:
            # TODO modify certvalidator to perform revinfo checks on
            #  expired certs, possibly as an option. If this happens, we
            #  can potentially emit the more accurate status
            #  OUT_OF_BOUNDS_NOT_REVOKED here in cases where it applies.
            ades_status = AdESIndeterminate.OUT_OF_BOUNDS_NO_POE
        else:
            ades_status = AdESIndeterminate.CERTIFICATE_CHAIN_GENERAL_FAILURE
    except PathValidationError as e:
        logger.warning(e.failure_msg, exc_info=e)
        # TODO verify whether this is appropriate
        ades_status = AdESIndeterminate.CHAIN_CONSTRAINTS_FAILURE
    except ValidationError as e:
        logger.warning(e.failure_msg, exc_info=e)
        ades_status = AdESIndeterminate.CERTIFICATE_CHAIN_GENERAL_FAILURE

    return ades_status, revo_details, None
