"""
Base Metadata Classes
"""

# TODO: Split this definition in smaller files

import collections
import copy
import enum
import functools
import os
import re
import shutil
from collections.abc import Iterable, Sequence
from re import Pattern
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    Literal,
    Optional,
    TypedDict,
    TypeVar,
    Union,
    cast,
)

import fmf
import fmf.base
import fmf.context
import fmf.utils
import jsonschema
from click import confirm, echo
from fmf.utils import listed

import tmt.ansible
import tmt.checks
import tmt.convert
import tmt.export
import tmt.frameworks
import tmt.guest
import tmt.hardware
import tmt.identifier
import tmt.lint
import tmt.log
import tmt.result
import tmt.steps
import tmt.steps.provision
import tmt.templates
import tmt.utils
import tmt.utils.git
import tmt.utils.jira
from tmt._compat.typing import Self
from tmt.checks import Check
from tmt.container import (
    SerializableContainer,
    SpecBasedContainer,
    container,
    container_field,
    container_fields,
    field,
)
from tmt.lint import LinterOutcome, LinterReturn
from tmt.result import ResultInterpret
from tmt.utils import (
    Environment,
    FmfContext,
    Path,
    ShellScript,
    normalize_shell_script,
    to_yaml,
    verdict,
)
from tmt.utils.themes import style

if TYPE_CHECKING:
    from pint import Quantity

    import tmt.cli
    from tmt.base.plan import Plan
    from tmt.base.run import Run


T = TypeVar('T')

# Default test duration is 5m for individual tests discovered from L1
# metadata and 1h for scripts defined directly in plans (L2 metadata).
DEFAULT_TEST_DURATION_L1 = '5m'
DEFAULT_TEST_DURATION_L2 = '1h'
#: The default order of any object.
DEFAULT_ORDER = 50

DEFAULT_TEST_RESTART_LIMIT = 1
TEST_RESTART_MAX = 10

# Obsoleted test keys
OBSOLETED_TEST_KEYS = [
    "relevancy",
    "coverage",
]

# Unofficial temporary test keys
EXTRA_TEST_KEYS = [
    "id",
]

# Unofficial temporary story keys
EXTRA_STORY_KEYS = [
    "id",
]

# User-defined custom metadata prefix
EXTRA_KEYS_PREFIX = 'extra-'

# Extra keys used for identification in Result class
EXTRA_RESULT_IDENTIFICATION_KEYS = ['extra-nitrate', 'extra-task']


def _compile_section_heading_patterns(tag: str, *texts: str) -> list[Pattern[str]]:
    """
    Generate list of compiled regex patterns for given tag and texts

    :param tag: name of HTML tag
    :param texts: list of text patterns to match
    :return: list of compiled regex patterns
    """
    return [re.compile(rf'<\s*{tag}\s*>\s*{text}\s*<\s*/\s*{tag}\s*>') for text in texts]


SECTIONS_HEADINGS = {
    'Setup': _compile_section_heading_patterns('h1', 'Setup'),
    'Test': _compile_section_heading_patterns('h1', 'Test', r'Test\s+.*'),
    'Step': _compile_section_heading_patterns('h2', 'Step', r'Test\s+Step'),
    'Expect': _compile_section_heading_patterns('h2', 'Expect', 'Result', r'Expected\s+Result'),
    'Cleanup': _compile_section_heading_patterns('h1', 'Cleanup'),
}

# Reguar expression to match a required property in a schema validation error
SCHEMA_REQUIRED_PROPERTY_PATTERN = re.compile(
    r"'([a-zA-Z0-9', \-]+)' is a required property",
    flags=re.MULTILINE | re.IGNORECASE,
)

#
# fmf id types
#
# See https://fmf.readthedocs.io/en/latest/concept.html#identifiers for
# formal specification.
#


# A "raw" fmf id as stored in fmf node data, i.e. as a mapping with various keys.
# Used for a brief moment, to annotate raw fmf data before they are converted
# into FmfId instances.
class _RawFmfId(TypedDict, total=False):
    url: Optional[str]
    ref: Optional[str]
    path: Optional[str]
    name: Optional[str]


# An internal fmf id representation.
@container
class FmfId(
    SpecBasedContainer[_RawFmfId, _RawFmfId], SerializableContainer, tmt.export.Exportable['FmfId']
):
    # The list of valid fmf id keys
    VALID_KEYS: ClassVar[list[str]] = ['url', 'ref', 'path', 'name']

    #: Keys that are present, might be set, but shall not be exported.
    NONEXPORTABLE_KEYS: ClassVar[list[str]] = ['fmf_root', 'git_root', 'default_branch']

    # Save context of the ID for later - there are places where it matters,
    # e.g. to not display `ref` under some conditions.
    fmf_root: Optional[Path] = None
    git_root: Optional[Path] = None
    default_branch: Optional[str] = None

    url: Optional[str] = None
    ref: Optional[str] = None
    path: Optional[Path] = None
    name: Optional[str] = None

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_dict(self) -> _RawFmfId:  # type: ignore[override]
        """
        Return keys and values in the form of a dictionary
        """

        return cast(_RawFmfId, super().to_dict())

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_minimal_dict(self) -> _RawFmfId:  # type: ignore[override]
        """
        Convert to a mapping with unset keys omitted
        """

        return cast(_RawFmfId, super().to_minimal_dict())

    # TODO: make this declarative, Exportable should support this feature
    # out of the box. Maybe even add a parameter for field().
    @classmethod
    def _drop_nonexportable(cls, exported: _RawFmfId) -> _RawFmfId:
        # ignore[misc]: since `exported` is a typed dict, only literals can be used as keys
        for key in cls.NONEXPORTABLE_KEYS:
            exported.pop(key, None)  # type: ignore[misc]

        return exported

    def to_spec(self) -> _RawFmfId:
        """
        Convert to a form suitable for saving in a specification file
        """

        spec = self.to_dict()

        if self.path is not None:
            spec['path'] = str(self.path)

        return self._drop_nonexportable(spec)

    def to_minimal_spec(self) -> _RawFmfId:
        """
        Convert to specification, skip default values
        """

        spec = super().to_minimal_spec()

        if self.path is not None:
            spec['path'] = str(self.path)

        return self._drop_nonexportable(spec)

    @classmethod
    def from_spec(cls, raw: _RawFmfId) -> 'FmfId':
        """
        Convert from a specification file or from a CLI option
        """

        # TODO: with mandatory validation, this can go away.
        ref = raw.get('ref', None)
        if not isinstance(ref, (type(None), str)):
            # TODO: deliver better key address
            raise tmt.utils.NormalizationError('ref', ref, 'unset or a string')

        fmf_id = FmfId()

        for key in ('url', 'ref', 'name'):
            setattr(fmf_id, key, cast(Optional[str], raw.get(key, None)))

        for key in ('path',):
            raw_path = cast(Optional[str], raw.get(key, None))
            setattr(fmf_id, key, Path(raw_path) if raw_path is not None else None)

        return fmf_id

    def validate(self) -> tuple[bool, str]:
        """
        Validate fmf id and return a human readable error

        Return a tuple (boolean, message) as the result of validation.
        The boolean specifies the validation result and the message
        the validation error. In case the FMF id is valid, return an empty
        string as the message.
        """
        # Validate remote id and translate to human readable errors
        try:
            # Simple asdict() is not good enough, fmf does not like keys that exist but are `None`.
            # Don't include those.
            node_data = {key: value for key, value in self.items() if value is not None}
            if self.path:
                node_data['path'] = str(self.path)
            if self.url:
                node_data['url'] = tmt.utils.git.clonable_git_url(self.url)
            fmf.base.Tree.node(node_data)
        except fmf.utils.GeneralError as error:
            # Map fmf errors to more user friendly alternatives
            error_map: list[tuple[str, str]] = [
                ('git clone', f"repo '{self.url}' cannot be cloned"),
                ('git checkout', f"git ref '{self.ref}' is invalid"),
                ('directory path', f"path '{self.path}' is invalid"),
                ('tree root', f"No tree found in repo '{self.url}', missing an '.fmf' directory?"),
            ]

            stringified_error = str(error)

            errors = [message for needle, message in error_map if needle in stringified_error]
            return (False, errors[0] if errors else stringified_error)

        return (True, '')

    # cast: expected, we do want to return more specific type than the one returned by the
    # existing serialization.
    def _export(self, *, keys: Optional[list[str]] = None) -> tmt.export._RawExportedInstance:
        spec = self.to_minimal_spec()

        spec = self._drop_nonexportable(spec)

        return cast(tmt.export._RawExportedInstance, spec)

    def resolve_dynamic_ref(self, git_repository: Path, plan: "Plan") -> None:
        """
        Update ``ref`` with the final value for the dynamic reference.

        :py:attr:`ref` remains unchanged when it is not a
        :ref:`dynamic reference<dynamic-ref>`.
        """

        try:
            dynamic_ref = resolve_dynamic_ref(
                workdir=git_repository,
                ref=self.ref,
                plan=plan,
                logger=plan._logger,
            )

            if self.ref != dynamic_ref:
                plan.debug(f"Update 'ref' to '{dynamic_ref}'.")

                self.ref = dynamic_ref

        except tmt.utils.FileError as error:
            raise tmt.utils.DiscoverError(
                f"Failed to resolve dynamic ref of '{self.ref}'."
            ) from error


#
# Various types describing constructs as stored in "raw" fmf node data.
# Used for a brief moment, to annotate raw fmf data before they are converted
# into their internal representations.
#

#
# A type describing the raw form of the core `link` attribute. See
# https://tmt.readthedocs.io/en/stable/spec/core.html#link for its
# formal specification. Internally, a link is represented by a `Link`
# class instance, and types below describe the raw data coming from Fmf
# nodes and CLI options.


# Link relations.
_RawLinkRelationName = Literal[
    'verifies',
    'verified-by',
    'implements',
    'implemented-by',
    'documents',
    'documented-by',
    'blocks',
    'blocked-by',
    'duplicates',
    'duplicated-by',
    'parent',
    'child',
    'relates',
    'test-script',
    # Special case: not a relation, but it can appear where relations appear in
    # link data structures.
    'note',
]

# Link target - can be either a string (like test case name or URL), or an fmf id.
_RawLinkTarget = Union[str, _RawFmfId]

# Basic "relation-aware" link - essentially a mapping with one key/value pair.
_RawLinkRelation = dict[_RawLinkRelationName, _RawLinkTarget]

# A single link can be represented as a string or FMF ID (meaning only target is specified),
# or a "relation-aware" link aka mapping defined above.
_RawLink = Union[str, _RawFmfId, _RawLinkRelation]

# Collection of links - can be either a single link, or a list of links, and all
# link forms may be used together.
_RawLinks = Union[_RawLink, list[_RawLink]]


# A type describing `adjust` content. See
# https://tmt.readthedocs.io/en/stable/spec/core.html#adjust for its formal specification.
#
# The type is incomplete in the sense it does not describe all keys it may contain,
# like `environment` or `provision`, and focuses only on the keys defined for `adjust`
# itself.
_RawAdjustRule = TypedDict(
    '_RawAdjustRule',
    {
        'when': Optional[str],
        'continue': Optional[bool],
        'because': Optional[str],
    },
)


def create_adjust_callback(logger: tmt.log.Logger) -> fmf.base.AdjustCallback:
    """
    Create a custom callback for fmf's adjust.

    Given the ``adjust`` rules are applied on many places, for
    proper logging they need their own specific logger. Create
    a callback closure with the given logger.
    """

    def callback(node: fmf.Tree, rule: _RawAdjustRule, applied: Optional[bool]) -> None:
        if applied is None:
            logger.verbose(
                f"Adjust rule skipped on '{node.name}'",
                tmt.utils.format_value(rule, key_color='cyan'),
                color='blue',
                level=3,
                topic=tmt.log.Topic.ADJUST_DECISIONS,
            )

        elif applied is False:
            logger.verbose(
                f"Adjust rule not applied to '{node.name}'",
                tmt.utils.format_value(rule, key_color='cyan'),
                color='red',
                level=3,
                topic=tmt.log.Topic.ADJUST_DECISIONS,
            )

        else:
            logger.verbose(
                f"Adjust rule applied to '{node.name}'",
                tmt.utils.format_value(rule, key_color='cyan'),
                color='green',
                level=3,
                topic=tmt.log.Topic.ADJUST_DECISIONS,
            )

    return callback


# Types describing content accepted by various require-like keys: strings, fmf ids,
# paths, or lists mixing various types.
#
# See https://tmt.readthedocs.io/en/latest/spec/tests.html#require


class DependencySimple(str):
    """
    A basic, simple dependency, usually a package
    """

    # ignore[override]: expected, we do want to accept and return more
    # specific types than those declared in superclass.
    @classmethod
    def from_spec(cls, spec: str) -> 'DependencySimple':
        return DependencySimple(spec)

    def to_spec(self) -> str:
        return str(self)

    def to_minimal_spec(self) -> str:
        return self.to_spec()


class _RawDependencyFmfId(_RawFmfId):
    destination: Optional[str]
    nick: Optional[str]
    type: Optional[str]


@container
class DependencyFmfId(
    FmfId,
    # Repeat the SpecBasedContainer, with more fitting in/out spec type.
    SpecBasedContainer[_RawDependencyFmfId, _RawDependencyFmfId],
):
    """
    A fmf ID as a dependency.

    Not a pure fmf ID though, the form accepted by `require` & co. allows
    several extra keys.
    """

    VALID_KEYS: ClassVar[list[str]] = [*FmfId.VALID_KEYS, 'destination', 'nick', 'type']

    destination: Optional[Path] = None
    nick: Optional[str] = None
    # fmf id dependency is a beakerlib dependency, as of now, there is no other
    # allowed `type` value.
    type: str = 'library'

    # TODO: frozen=True would be better, but our containers may get modified,
    # and fixing that would result in a big patch. To allow use of dependencies
    # in sets, provide __hash__ & fix the rest later.
    def __hash__(self) -> int:
        return hash(getattr(self, key) for key in self.VALID_KEYS)

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_dict(self) -> _RawDependencyFmfId:  # type: ignore[override]
        return cast(_RawDependencyFmfId, super().to_dict())

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_minimal_dict(self) -> _RawDependencyFmfId:  # type: ignore[override]
        """
        Convert to a mapping with unset keys omitted
        """

        return cast(_RawDependencyFmfId, super().to_minimal_dict())

    def to_spec(self) -> _RawDependencyFmfId:
        """
        Convert to a form suitable for saving in a specification file
        """

        spec = self.to_dict()

        if self.destination is not None:
            spec['destination'] = str(self.destination)

        return spec

    def to_minimal_spec(self) -> _RawDependencyFmfId:
        """
        Convert to specification, skip default values
        """

        spec = self.to_minimal_dict()

        if self.destination is not None:
            spec['destination'] = str(self.destination)

        return spec

    # ignore[override]: expected, we do want to accept and return more
    # specific types than those declared in superclass.
    @classmethod
    def from_spec(cls, raw: _RawDependencyFmfId) -> 'DependencyFmfId':  # type: ignore[override]
        """
        Convert from a specification file or from a CLI option
        """

        # TODO: with mandatory validation, this can go away.
        ref = raw.get('ref', None)
        if not isinstance(ref, (type(None), str)):
            # TODO: deliver better key address
            raise tmt.utils.NormalizationError('ref', ref, 'unset or a string')

        fmf_id = DependencyFmfId()

        for key in ('url', 'ref', 'name', 'nick'):
            raw_value = raw.get(key, None)

            setattr(fmf_id, key, None if raw_value is None else str(raw_value))

        for key in ('path', 'destination'):
            raw_path = cast(Optional[str], raw.get(key, None))
            setattr(fmf_id, key, Path(raw_path) if raw_path is not None else None)

        return fmf_id


class _RawDependencyFile(TypedDict):
    type: Optional[str]
    pattern: Optional[list[str]]


@container
class DependencyFile(
    SpecBasedContainer[_RawDependencyFile, _RawDependencyFile],
    SerializableContainer,
    tmt.export.Exportable['DependencyFile'],
):
    VALID_KEYS: ClassVar[list[str]] = ['type', 'pattern']

    type: str = 'file'
    pattern: list[str] = field(default_factory=list, normalize=tmt.utils.normalize_string_list)

    # TODO: frozen=True would be better, but our containers may get modified,
    # and fixing that would result in a big patch. To allow use of dependencies
    # in sets, provide __hash__ & fix the rest later.
    def __hash__(self) -> int:
        values = tuple(getattr(self, key) for key in self.VALID_KEYS if key != 'pattern') + tuple(
            pattern for pattern in self.pattern
        )

        return hash(values)

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_dict(self) -> _RawDependencyFile:  # type: ignore[override]
        """
        Return keys and values in the form of a dictionary
        """
        return cast(_RawDependencyFile, super().to_dict())

    # ignore[override]: expected, we do want to return more specific
    # type than the one declared in superclass.
    def to_minimal_dict(self) -> _RawDependencyFile:  # type: ignore[override]
        """
        Convert to a mapping with unset keys omitted
        """
        return cast(_RawDependencyFile, super().to_minimal_dict())

    # ignore[override]: expected, we do want to accept and return more
    # specific types than those declared in superclass.
    @classmethod
    def from_spec(cls, raw: _RawDependencyFile) -> 'DependencyFile':
        """
        Convert from a specification file or from a CLI option
        """
        dependency = cls()

        raw_patterns = raw.get('pattern', [])
        if not raw_patterns:
            raise tmt.utils.SpecificationError(f'Missing pattern for file require: {raw}')

        if isinstance(raw_patterns, str):
            dependency.pattern.append(raw_patterns)
        else:
            dependency.pattern += [str(pattern) for pattern in raw_patterns]

        return dependency

    @staticmethod
    def validate() -> tuple[bool, str]:
        """
        Validate file dependency and return a human readable error

        There is no way to check validity of type or pattern string at
        this time. Return a tuple (boolean, message) as the result of
        validation. The boolean specifies the validation result and the
        message the validation error. In case the file dependency is
        valid, return an empty string as the message.
        """
        return True, ''

    def _export(self, *, keys: Optional[list[str]] = None) -> tmt.export._RawExportedInstance:
        return cast(tmt.export._RawExportedInstance, self.to_dict())


_RawDependencyItem = Union[str, _RawDependencyFmfId, _RawDependencyFile]
_RawDependency = Union[_RawDependencyItem, list[_RawDependencyItem]]

Dependency = Union[DependencySimple, DependencyFmfId, DependencyFile]


def dependency_factory(raw_dependency: Optional[_RawDependencyItem]) -> Dependency:
    """
    Select the correct require class
    """
    if isinstance(raw_dependency, dict):
        dependency_type = raw_dependency.get('type', 'library')
        if dependency_type == 'library':  # can't use isinstance check with TypedDict
            return DependencyFmfId.from_spec(raw_dependency)  # type: ignore[arg-type]
        if dependency_type == 'file':
            return DependencyFile.from_spec(raw_dependency)  # type: ignore[arg-type]

    assert isinstance(raw_dependency, str)  # check type
    return DependencySimple.from_spec(raw_dependency)


def normalize_require(
    key_address: str, raw_require: Optional[_RawDependency], logger: tmt.log.Logger
) -> list[Dependency]:
    """
    Normalize content of ``require`` key.

    The requirements may be defined as either string or a special fmf id
    flavor, or a mixed list of these types. The goal here is to reduce the
    space of possibilities to a list, with fmf ids being converted to their
    internal representation.
    """

    if raw_require is None:
        return []

    if isinstance(raw_require, (str, dict)):
        return [dependency_factory(raw_require)]

    if isinstance(raw_require, (list, tuple)):
        return [dependency_factory(require) for require in raw_require]

    raise tmt.utils.NormalizationError(
        key_address, raw_require, 'a string, a library, a file or a list of their combinations'
    )


def assert_simple_dependencies(
    dependencies: list[Dependency], error_message: str, logger: tmt.log.Logger
) -> list[DependencySimple]:
    """
    Make sure the list of dependencies consists of simple ones.

    :param dependencies: the list of requires.
    :param error_message: used for a raised exception.
    :param logger: used for logging.
    :raises GeneralError: when there is a dependency on the list which
        is not a subclass of :py:class:`tmt.base.core.DependencySimple`.
    """

    non_simple_dependencies = list(
        filter(lambda dependency: not isinstance(dependency, DependencySimple), dependencies)
    )

    if not non_simple_dependencies:
        return cast(list[DependencySimple], dependencies)

    for dependency in non_simple_dependencies:
        logger.fail(f'Invalid requirement: {dependency}')

    raise tmt.utils.GeneralError(error_message)


CoreT = TypeVar('CoreT', bound='Core')


def _normalize_link(key_address: str, value: _RawLinks, logger: tmt.log.Logger) -> 'Links':
    return Links(data=value)


@container(repr=False)
class Core(
    tmt.utils.ValidateFmfMixin,
    tmt.utils.LoadFmfKeysMixin,
    tmt.utils.Common,
):
    """
    General node object

    Corresponds to given fmf.Tree node.
    Implements common Test, Plan and Story methods.
    Also defines L0 metadata and its manipulation.
    """

    # Core attributes (supported across all levels)
    summary: Optional[str] = None
    description: Optional[str] = None
    author: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    contact: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    enabled: bool = True
    order: int = field(
        default=DEFAULT_ORDER,
        normalize=lambda key_address, raw_value, logger: (
            DEFAULT_ORDER if raw_value is None else int(raw_value)
        ),
    )
    link: Optional['Links'] = field(
        default=cast(Optional['Links'], None),
        normalize=_normalize_link,
        exporter=lambda value: value.to_spec() if value is not None else [],
    )
    id: Optional[str] = None
    tag: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    tier: Optional[str] = field(
        default=None,
        normalize=lambda key_address, raw_value, logger: (
            None if raw_value is None else str(raw_value)
        ),
    )
    adjust: Optional[list[_RawAdjustRule]] = field(
        default_factory=list,
        normalize=lambda key_address, raw_value, logger: (
            []
            if raw_value is None
            else ([raw_value] if not isinstance(raw_value, list) else raw_value)
        ),
    )

    _KEYS_SHOW_ORDER = [
        # Basic stuff
        'summary',
        'description',
        'author',
        'contact',
        'enabled',
        'order',
        'id',
        # Filtering and more
        'tag',
        'tier',
        'link',
        'adjust',
    ]

    def __init__(
        self,
        *,
        node: fmf.Tree,
        tree: Optional['Tree'] = None,
        parent: Optional[tmt.utils.Common] = None,
        logger: tmt.log.Logger,
        name: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """
        Initialize the node
        """
        super().__init__(node=node, logger=logger, parent=parent, name=name or node.name, **kwargs)

        self.node = node
        self.tree = tree

        # Verify id is not inherited from parent.
        if self.id and not tmt.utils.is_key_origin(node, 'id'):
            raise tmt.utils.SpecificationError(
                f"The 'id' key '{node.get('id')}' in '{self.name}' "
                f"is inherited from parent, should be defined in a leaf."
            )

        # Store original metadata with applied defaults and including
        # keys which are not defined in the L1 metadata specification
        # Once the whole node has been initialized,
        # self._update_metadata() must be called to work correctly.
        self._metadata = self.node.data.copy()

    def __str__(self) -> str:
        """
        Node name
        """
        return self.name

    @classmethod
    def from_tree(cls, tree: 'tmt.Tree') -> list[Self]:
        """
        Gather list of instances of this class in a given tree.

        Helpful when looking for objects of a class derived from :py:class:`Core`
        in a given tree, encapsulating the mapping between core classes and tree
        search methods.

        :param tree: tree to search for objects.
        """

        return cast(list[Self], getattr(tree, f'{cls.__name__.lower()}s')())

    def _update_metadata(self) -> None:
        """
        Update the _metadata attribute
        """
        self._metadata.update(self._export())
        self._metadata['name'] = self.name

    def _show_additional_keys(self) -> None:
        """
        Show source files
        """
        if self.id is not None:
            echo(tmt.utils.format('id', self.id, key_color='magenta'))
        echo(tmt.utils.format('sources', self.fmf_sources, key_color='magenta'))
        self._fmf_id()
        web_link = self.web_link()
        if web_link is not None:
            echo(tmt.utils.format('web', web_link, key_color='magenta', wrap=False))

    def _fmf_id(self) -> None:
        """
        Show fmf identifier
        """
        echo(
            tmt.utils.format(
                'fmf-id', cast(dict[str, Any], self.fmf_id.to_minimal_spec()), key_color='magenta'
            )
        )

    # TODO: cached_property candidates
    @property
    def fmf_root(self) -> Optional[Path]:
        # Check if fmf root is defined
        if self.node.root is not None:
            return Path(self.node.root)
        return None

    @property
    def anchor_path(self) -> Path:
        return self.fmf_root or Path.cwd()

    @property
    def git_root(self) -> Optional[Path]:
        return tmt.utils.git.git_root(fmf_root=self.anchor_path, logger=self._logger)

    # Caching properties does not play nicely with mypy and annotations,
    # and constructing a workaround would be hard because of support of
    # Python 3.6 tmt wishes to maintain.
    # https://github.com/python/mypy/issues/5858
    @property
    def fmf_id(self) -> FmfId:
        """
        Return full fmf identifier of the node
        """

        # When the object is built from an fmf node without a root, it's
        # probably not possible for the object to have any fmf ID in the
        # context of a non-existent fmf tree.
        if self.node.root is None:
            return FmfId()

        return tmt.utils.fmf_id(name=self.name, fmf_root=self.anchor_path, logger=self._logger)

    @functools.cached_property
    def fmf_sources(self) -> list[Path]:
        return [Path(source) for source in self.node.sources]

    def web_link(self) -> Optional[str]:
        """
        Return a clickable web link to the fmf metadata location
        """
        if self.fmf_id.ref is None or self.fmf_id.url is None:
            return None

        if not self.fmf_sources:
            return None

        # Detect relative path of the last source from the metadata tree root
        relative_path = self.fmf_sources[-1].relative_to(self.node.root)
        relative_path = Path('/') if str(relative_path) == '.' else Path('/') / relative_path

        # Add fmf path if the tree is nested deeper in the git repo
        if self.fmf_id.path:
            relative_path = self.fmf_id.path / relative_path.relative_to('/')

        return tmt.utils.git.web_git_url(self.fmf_id.url, self.fmf_id.ref, relative_path)

    @classmethod
    def store_cli_invocation(
        cls, context: Optional['tmt.cli.Context'], options: Optional[dict[str, Any]] = None
    ) -> 'tmt.cli.CliInvocation':
        """
        Save provided command line context for future use
        """
        invocation = super().store_cli_invocation(context, options)

        # Handle '.' as an alias for the current working directory
        names = cls._opt('names')
        if context is not None and names is not None and '.' in names:
            assert context.obj.tree.root is not None  # narrow type
            root = context.obj.tree.root
            current = Path.cwd()
            # Handle special case when directly in the metadata root
            if current.resolve() == root.resolve():
                pattern = '/'
            # Prepare path from the tree root to the current directory
            else:
                # Prevent matching common prefix from other directories
                pattern = f"{current.relative_to(root)}(/|$)"
            invocation.options['names'] = tuple(pattern if name == '.' else name for name in names)

        return invocation

    def name_and_summary(self) -> str:
        """
        Node name and optional summary
        """
        if self.summary:
            return f'{self.name} ({self.summary})'
        return self.name

    def ls(self, summary: bool = False) -> None:
        """
        List node
        """
        echo(style(self.name, fg='red'))
        if summary and self.summary:
            echo(tmt.utils.format('summary', self.summary))

    def _export(
        self, *, keys: Optional[list[str]] = None, include_internal: bool = False
    ) -> tmt.export._RawExportedInstance:
        if keys is None:
            keys = self._keys()

        # Always include node name, add requested keys, ignore adjust
        data: dict[str, Any] = {'name': self.name}
        for key in keys:
            # TODO: provide more mature solution for https://github.com/teemtee/tmt/issues/1688
            # Until that, do not export fields that start with an underscore, to avoid leaking
            # "private" object members.
            if key.startswith('_'):
                continue

            # TODO: override the method, or provide better way to filter out this class var
            if key == '_export_plugin_registry':
                continue

            if key == 'adjust':
                continue

            # TODO: once `Core` classes become `DataContainers`, we would be able to get this
            # done automagically. Until then, using proper conversion helpers to emit correct
            # key/value pairs.
            _, option, value, _, metadata = container_field(self, key)

            if metadata.internal and not include_internal:
                continue

            if metadata.export_callback:
                data[option] = metadata.export_callback(value)

            else:
                data[option] = value

        data['sources'] = [str(path) for path in self.fmf_sources]

        return data

    def _lint_keys(self, additional_keys: list[str]) -> list[str]:
        """
        Return list of invalid keys used, empty when all good
        """

        known_keys: list[str] = []

        for field_ in container_fields(self):
            _, key, _, _, metadata = container_field(self, field_.name)

            if metadata.internal:
                continue

            known_keys.append(key)

        known_keys.extend(additional_keys)

        return [
            key
            for key in self.node.get()
            if key not in known_keys and not key.startswith(EXTRA_KEYS_PREFIX)
        ]

    def lint_validate(self) -> LinterReturn:
        """
        C000: fmf node should pass the schema validation
        """

        schema_name = self.__class__.__name__.lower()

        errors = tmt.utils.validate_fmf_node(self.node, f'{schema_name}.yaml', self._logger)

        if errors:
            # A bit of error formatting. It is possible to use str(error), but the result
            # is a bit too JSON-ish. Let's present an error message in a way that helps
            # users to point finger on each and every issue.

            def detect_unallowed_properties(error: jsonschema.ValidationError) -> LinterReturn:
                match = re.search(
                    r"(?mi)Additional properties are not allowed \(([a-zA-Z0-9', \-]+) (?:was|were) unexpected\)",  # noqa: E501
                    str(error),
                )

                if not match:
                    return

                for bad_property in match.group(1).replace("'", '').replace(' ', '').split(','):
                    if isinstance(error.schema, dict) and '$id' in error.schema:
                        yield (
                            LinterOutcome.WARN,
                            f'key "{bad_property}" not recognized by schema {error.schema["$id"]}',
                        )
                    else:
                        yield LinterOutcome.WARN, f'key "{bad_property}" not recognized by schema'

            # A key not recognized, but when patternProperties are allowed. In that case,
            # the key is both not listed and not matching the pattern.
            def detect_unallowed_properties_with_pattern(
                error: jsonschema.ValidationError,
            ) -> LinterReturn:
                match = re.search(
                    r"(?mi)'([a-zA-Z0-9', \-]+)' (?:does|do) not match any of the regexes: '([a-zA-Z0-9\-\^\+\*]+)'",  # noqa: E501
                    str(error),
                )

                if not match:
                    return

                for bad_property in match.group(1).replace("'", '').replace(' ', '').split(','):
                    if isinstance(error.schema, dict) and '$id' in error.schema:
                        yield (
                            LinterOutcome.WARN,
                            (
                                f'key "{bad_property}" not recognized by schema {error.schema["$id"]}, '  # noqa: E501
                                f'and does not match "{match.group(2)}" pattern'
                            ),
                        )

                    else:
                        yield (
                            LinterOutcome.WARN,
                            (
                                f'key "{bad_property}" not recognized by schema, '
                                f'and does not match "{match.group(2)}" pattern'
                            ),
                        )

            # A key value is not recognized. This is often a case with keys whose values are
            # limited by an enum, like `how`. Unfortunately, validator will record every mismatch
            # between a value and enum even if eventually a match is found. So it might be tempting
            # to ignore this particular kind of error - it may also indicate a genuine typo in
            # key name or completely misplaced key, so it's still necessary to report the error.
            def detect_enum_violations(error: jsonschema.ValidationError) -> LinterReturn:
                match = re.search(
                    r"(?mi)'([a-z\-]+)' is not one of \['([a-zA-Z\-]+)'\]", str(error)
                )

                if not match:
                    return

                # Older jsonpatch packages may lack this attribute
                if not hasattr(error, 'json_path'):
                    json_path = '$'

                    for elem in error.absolute_path:
                        if isinstance(elem, int):
                            json_path += f'[{elem}]'
                        else:
                            json_path += f'.{elem}'

                else:
                    json_path = error.json_path

                yield (
                    LinterOutcome.WARN,
                    f'value of "{json_path.split(".")[-1]}" is not "{match.group(2)}"',
                )

            def detect_missing_required_properties(
                error: jsonschema.ValidationError,
            ) -> LinterReturn:
                match = SCHEMA_REQUIRED_PROPERTY_PATTERN.search(str(error))

                if not match:
                    return

                if isinstance(error.schema, dict) and '$id' in error.schema:
                    message = (
                        f'"{match.group(1)}" is a required property by '
                        f'schema {error.schema["$id"]}'
                    )

                else:
                    message = f'"{match.group(1)}" is a required property by schema'

                yield (LinterOutcome.WARN, message)

            def detect_errors(error: jsonschema.ValidationError) -> LinterReturn:
                yield from detect_unallowed_properties(error)
                yield from detect_unallowed_properties_with_pattern(error)
                yield from detect_enum_violations(error)
                yield from detect_missing_required_properties(error)

            for error, _ in errors:
                yield from detect_errors(error)

                # Validation errors can have "context", a list of "sub" errors encountered during
                # validation. Interesting ones are identified & added to our error message.
                if error.context:
                    for suberror in error.context:
                        yield from detect_errors(suberror)

            yield LinterOutcome.WARN, 'fmf node failed schema validation'

            return

        yield LinterOutcome.PASS, 'fmf node passes schema validation'

    def lint_summary_exists(self) -> LinterReturn:
        """
        C001: summary key should be set and should be reasonably long
        """

        if not self.summary:
            yield LinterOutcome.WARN, 'summary key is missing'
            return

        if len(self.summary) > 50:
            yield LinterOutcome.WARN, 'summary should not exceed 50 characters'
            return

        yield LinterOutcome.PASS, 'summary key is set and is reasonably long'

    def has_link(self, needle: 'LinkNeedle') -> bool:
        """
        Whether object contains specified link
        """

        if self.link is None:
            return False

        return self.link.has_link(needle=needle)


@container(repr=False)
class Test(
    # TODO: `Test` does "have" environment, but it's a genuine attribute,
    # not a property, and this interface will not work.
    # HasEnvironment,
    Core,
    tmt.export.Exportable['Test'],
    tmt.lint.Lintable['Test'],
):
    """
    Test object (L1 Metadata)
    """

    # Basic test information
    component: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )

    # Test execution data
    # TODO: mandatory schema validation would remove the need for Optional...
    # `test` is mandatory, must exist, so how to initialize if it's missing :(
    test: Optional[ShellScript] = field(
        default=None,
        normalize=normalize_shell_script,
        exporter=lambda value: str(value) if isinstance(value, ShellScript) else None,
    )
    path: Optional[Path] = field(
        default=None,
        normalize=tmt.utils.normalize_path,
        exporter=lambda value: str(value) if isinstance(value, Path) else None,
    )
    framework: str = "shell"
    manual: bool = False
    tty: bool = False

    require: list[Dependency] = field(
        default_factory=list,
        normalize=normalize_require,
        exporter=lambda value: [dependency.to_minimal_spec() for dependency in value],
    )
    recommend: list[Dependency] = field(
        default_factory=list,
        normalize=normalize_require,
        exporter=lambda value: [dependency.to_minimal_spec() for dependency in value],
    )
    environment: tmt.utils.Environment = field(
        default_factory=tmt.utils.Environment,
        normalize=tmt.utils.Environment.normalize,
        serialize=lambda environment: environment.to_fmf_spec(),
        unserialize=lambda serialized: tmt.utils.Environment.from_fmf_spec(serialized),
        exporter=lambda environment: environment.to_fmf_spec(),
    )

    duration: str = DEFAULT_TEST_DURATION_L1
    result: ResultInterpret = field(
        default=ResultInterpret.RESPECT,
        normalize=ResultInterpret.normalize,
        serialize=lambda result: result.value,
        unserialize=ResultInterpret.from_spec,
        exporter=lambda result: result.value,
    )

    where: list[str] = field(default_factory=list)

    check: list[Check] = field(
        default_factory=list,
        normalize=tmt.checks.normalize_test_checks,
        serialize=lambda checks: [check.to_spec() for check in checks],
        unserialize=lambda serialized: [Check.from_spec(**check) for check in serialized],
        exporter=lambda value: [check.to_minimal_spec() for check in value],
    )

    restart_on_exit_code: list[int] = field(
        default_factory=list, normalize=tmt.utils.normalize_integer_list
    )
    restart_max_count: int = field(
        default=DEFAULT_TEST_RESTART_LIMIT,
        # TODO: enforce upper limit, TEST_RESTART_MAX
    )
    restart_with_reboot: bool = False

    serial_number: int = field(default=0, internal=True)

    _original_require: list[Dependency] = field(
        internal=True,
        default_factory=list,
    )
    _original_recommend: list[Dependency] = field(
        internal=True,
        default_factory=list,
    )

    _KEYS_SHOW_ORDER = [
        # Basic test information
        'summary',
        'description',
        'author',
        'contact',
        'component',
        'id',
        # Test execution data
        'test',
        'path',
        'framework',
        'manual',
        'tty',
        'require',
        'recommend',
        'environment',
        'duration',
        'enabled',
        'order',
        'result',
        'check',
        'restart_on_exit_code',
        'restart_max_count',
        'restart_with_reboot',
        # Filtering attributes
        'tag',
        'tier',
        'link',
    ]

    @classmethod
    def from_dict(
        cls,
        *,
        mapping: dict[str, Any],
        name: str,
        skip_validation: bool = False,
        raise_on_validation_error: bool = False,
        logger: tmt.log.Logger,
        **kwargs: Any,
    ) -> 'Test':
        """
        Initialize test data from a dictionary.

        Useful when data describing a test are stored in a mapping instead of an fmf node.
        """

        if not name.startswith('/'):
            raise tmt.utils.SpecificationError("Test name should start with a '/'.")

        node = fmf.Tree(mapping)
        node.name = name

        return cls(
            logger=logger,
            node=node,
            skip_validation=skip_validation,
            raise_on_validation_error=raise_on_validation_error,
            **kwargs,
        )

    def __init__(
        self,
        *,
        node: fmf.Tree,
        tree: Optional['Tree'] = None,
        skip_validation: bool = False,
        raise_on_validation_error: bool = False,
        logger: tmt.log.Logger,
        **kwargs: Any,
    ) -> None:
        """
        Initialize test data from an fmf node or a dictionary

        The following two methods are supported:

            Test(node)
        """

        # Path defaults to the directory where metadata are stored or to
        # the root '/' if fmf metadata were not stored on the filesystem
        #
        # NOTE: default value of `path` attribute is not known when attribute
        # is declared, therefore we need to compute the default value and
        # assign it to attribute *before* calling superclass and its handy
        # node key extraction.
        try:
            directory = Path(node.sources[-1]).parent
            relative_path = directory.relative_to(Path(node.root))
            default_path = Path('/') if relative_path == Path('.') else Path('/') / relative_path
        except (AttributeError, IndexError):
            default_path = Path('/')

        self.path = default_path

        super().__init__(
            node=node,
            tree=tree,
            logger=logger,
            skip_validation=skip_validation,
            raise_on_validation_error=raise_on_validation_error,
            **kwargs,
        )

        # TODO: As long as validation is optional, a missing `test` key would be reported
        # as such but won't stop tmt from moving on.
        if self.test is None:
            raise tmt.utils.SpecificationError(
                f"The 'test' attribute in '{self.name}' must be defined."
            )

        self._update_metadata()
        self._original_require = self.require.copy()
        self._original_recommend = self.recommend.copy()

    @staticmethod
    def overview(tree: 'Tree') -> None:
        """
        Show overview of available tests
        """
        tests = [style(str(test), fg='red') for test in tree.tests()]
        echo(
            style(
                'Found {}{}{}.'.format(
                    listed(tests, 'test'), ': ' if tests else '', listed(tests, max=12)
                ),
                fg='blue',
            )
        )

    @staticmethod
    def create(
        *,
        names: list[str],
        template: str,
        path: Path,
        script: Optional[str] = None,
        force: bool = False,
        dry: Optional[bool] = None,
        logger: tmt.log.Logger,
    ) -> None:
        """
        Create a new test
        """
        if dry is None:
            dry = Test._opt('dry')

        def _get_template_content(template: str, template_type: str) -> str:
            if tmt.utils.is_url(template):
                return tmt.templates.MANAGER.render_from_url(template, logger)
            templates = tmt.templates.MANAGER.templates[template_type]
            try:
                return tmt.templates.MANAGER.render_file(templates[template])
            except KeyError as error:
                raise tmt.utils.GeneralError(f"Invalid template '{template}'.") from error

        metadata_content = _get_template_content(template, 'test')
        if script:
            script_content = _get_template_content(script, 'script')
        else:
            if tmt.utils.is_url(template):
                raise tmt.utils.GeneralError(
                    "You need to specify 'script' when using URL for 'template'."
                )
            # If script template is not set, use the same template name for the script
            script_content = _get_template_content(template, 'script')

        # Append link with appropriate relation
        links = Links(data=list(cast(list[_RawLink], Test._opt('link', []))))
        if links:  # Output 'links' if and only if it is not empty
            metadata_content += to_yaml({'link': links.to_spec()})

        for name in names:
            # Create directory
            if name == '.':
                directory_path = Path.cwd()
            else:
                directory_path = path / name.lstrip('/')
                tmt.utils.create_directory(
                    path=directory_path, name='test directory', dry=dry, logger=logger
                )

            # Create metadata
            tmt.utils.create_file(
                path=directory_path / 'main.fmf',
                name='test metadata',
                content=metadata_content,
                dry=dry,
                force=force,
                logger=logger,
            )

            # Create script
            tmt.utils.create_file(
                path=directory_path / 'test.sh',
                name='test script',
                content=script_content,
                mode=0o755,
                dry=dry,
                force=force,
                logger=logger,
            )

            if links.get('verifies') and dry is False:
                tests = Tree(path=path, logger=logger).tests(
                    names=[f"^{name}$"], apply_command_line=False
                )
                tmt.utils.jira.link(tmt_objects=tests, links=links, logger=logger)

    @property
    def manual_test_path(self) -> Path:
        assert self.manual, 'Test is not manual yet path to manual instructions was requested'

        assert self.test
        assert self.path

        return Path(self.node.root) / self.path.unrooted() / str(self.test)

    @property
    def test_framework(self) -> tmt.frameworks.TestFrameworkClass:
        framework = tmt.frameworks._FRAMEWORK_PLUGIN_REGISTRY.get_plugin(self.framework)

        if framework is None:
            raise tmt.utils.SpecificationError(f"Unknown framework '{self.framework}'.")

        return framework

    def enabled_on_guest(self, guest: tmt.guest.Guest) -> bool:
        """
        Check if the test is enabled on the specific guest
        """

        if not self.where:
            return True

        return any(destination in (guest.name, guest.role) for destination in self.where)

    def show_manual(self) -> None:
        """
        Show manual test instructions
        """

        if self.tree is None or self.tree.root is None:
            return

        if self.test is None or self.test._script is None:
            return

        if self.path is not None:
            instructions_path = self.tree.root / self.path.unrooted() / self.test._script
        else:
            instructions_path = self.tree.root / self.test._script

        try:
            instructions = instructions_path.read_text()
            echo(tmt.utils.format('instructions', instructions))

        except FileNotFoundError:
            self.warn(f"Manual test instructions file '{instructions_path}' not found.")

    def show(self) -> None:
        """
        Show test details
        """
        self.ls()
        for key in self._KEYS_SHOW_ORDER:
            _, _, value, _, metadata = container_field(self, key)

            if metadata.internal:
                continue

            if key == 'link' and value is not None:
                value.show()
                continue
            # No need to show the default order
            if key == 'order' and value == DEFAULT_ORDER:
                continue
            if key in ('require', 'recommend') and value:
                echo(
                    tmt.utils.format(
                        key,
                        [
                            dependency.to_minimal_spec()
                            for dependency in cast(list[Dependency], value)
                        ],
                    )
                )
                continue
            if key == 'check' and value:
                echo(
                    tmt.utils.format(key, [check.to_spec() for check in cast(list[Check], value)])
                )
                continue
            if key == 'result':
                echo(tmt.utils.format(key, value.value))
                continue
            if value not in [None, [], {}]:
                echo(tmt.utils.format(key, value))

            # Show test instructions for manual tests in verbose mode
            if key == "manual" and self.manual and self.verbosity_level:
                self.show_manual()

        if self.verbosity_level:
            self._show_additional_keys()
        if self.verbosity_level >= 2:
            # Print non-empty unofficial attributes
            for key in sorted(self.node.get().keys()):
                # Already asked to be printed
                if key in self._keys():
                    continue
                value = self.node.get(key)
                if value not in [None, [], {}]:
                    echo(tmt.utils.format(key, value, key_color='blue'))

    @functools.cached_property
    def ids(self) -> tmt.result.ResultIds:
        """
        Get a dictionary with ids like tmt id, extra-nitrate and extra-task for report step
        """
        ids: tmt.result.ResultIds = {tmt.identifier.ID_KEY: self.id}
        for key in EXTRA_RESULT_IDENTIFICATION_KEYS:
            value: Any = cast(Any, self.node.get(key))
            ids[key] = None if value is None else str(value)

        return ids

    # FIXME - Make additional attributes configurable
    def lint_unknown_keys(self) -> LinterReturn:
        """
        T001: all keys are known
        """

        # We don't want adjust in show/export so it is not yet in Test._keys
        invalid_keys = self._lint_keys(EXTRA_TEST_KEYS + OBSOLETED_TEST_KEYS + ['adjust'])

        if invalid_keys:
            for key in invalid_keys:
                yield LinterOutcome.FAIL, f'unknown key "{key}" is used'

            return

        yield LinterOutcome.PASS, 'correct keys are used'

    def lint_defined_test(self) -> LinterReturn:
        """
        T002: test script must be defined
        """

        if not self.test:
            yield LinterOutcome.FAIL, 'test script is not defined'
            return

        yield LinterOutcome.PASS, 'test script is defined'

    def lint_absolute_path(self) -> LinterReturn:
        """
        T003: test directory path must be absolute
        """

        if not self.path:
            yield LinterOutcome.FAIL, 'directory path is not set'
            return

        if not self.path.is_absolute():
            yield LinterOutcome.FAIL, 'directory path is not absolute'
            return

        yield LinterOutcome.PASS, 'directory path is absolute'

    def lint_path_exists(self) -> LinterReturn:
        """
        T004: test directory path must exist
        """

        if not self.path:
            yield LinterOutcome.FAIL, 'directory path is not set'
            return

        test_path = Path(self.node.root) / self.path.unrooted()

        if not test_path.exists():
            yield LinterOutcome.FAIL, f"test path '{test_path}' does not exist"
            return

        yield LinterOutcome.PASS, f"test path '{test_path}' does exist"

    def lint_legacy_relevancy_rules(self) -> LinterReturn:
        """
        T005: relevancy has been obsoleted by adjust
        """

        relevancy = self.node.get('relevancy')

        if not relevancy:
            yield LinterOutcome.SKIP, 'legacy relevancy not detected'
            return

        # Convert into adjust rules if --fix enabled
        if not self.opt('fix'):
            yield LinterOutcome.FAIL, 'relevancy has been obsoleted by adjust'
            return

        if not tmt.utils.is_key_origin(self.node, 'relevancy'):
            yield (
                LinterOutcome.FAIL,
                'relevancy detected but inherited from test parent, please, fix manually',
            )
            return

        filename = self.fmf_sources[-1]
        # `rt` mode is needed here to preserve comments during the `fix`, see comment in `_yaml`
        metadata: dict[str, Any] = tmt.utils.yaml_to_dict(self.read(filename), yaml_type="rt")

        metadata['adjust'] = tmt.convert.relevancy_to_adjust(
            metadata.pop('relevancy'), self._logger
        )
        self.write(filename, to_yaml(metadata, yaml_type="rt"))

        yield LinterOutcome.FIXED, 'relevancy converted into adjust'

    def lint_legacy_coverage_key(self) -> LinterReturn:
        """
        T006: coverage has been obsoleted by link
        """

        coverage = self.node.get('coverage')

        if coverage is None:
            yield LinterOutcome.SKIP, "legacy 'coverage' field not detected"
            return

        yield LinterOutcome.FAIL, "the 'coverage' field has been obsoleted by 'link'"

    # Check if the format of Markdown file respects the specification
    # https://tmt.readthedocs.io/en/latest/spec/tests.html#manual
    def lint_manual_test_path_exists(self) -> LinterReturn:
        """
        T007: manual test path is not an actual path
        """

        if not self.manual:
            yield LinterOutcome.SKIP, 'not a manual test'
            return

        if not self.path:
            yield LinterOutcome.FAIL, 'directory path is not set'
            return

        if not self.manual_test_path.exists():
            yield LinterOutcome.FAIL, f'manual test path "{self.manual_test_path}" does not exist'
            return

        yield LinterOutcome.PASS, f'manual test path "{self.manual_test_path}" does exist'

    def lint_manual_valid_markdown(self) -> LinterReturn:
        """
        T008: manual test should be valid markdown
        """

        if not self.manual:
            yield LinterOutcome.SKIP, 'not a manual test'
            return

        try:
            warnings = tmt.export.check_md_file_respects_spec(self.manual_test_path)

        except tmt.utils.ConvertError as exc:
            yield LinterOutcome.FAIL, f'cannot open the manual test path: {exc}'
            return

        if warnings:
            for warning in warnings:
                # replace new-lines with a (single) space
                yield LinterOutcome.WARN, ' '.join(line.strip() for line in warning.splitlines())

            return

        yield LinterOutcome.PASS, 'correct manual test syntax'

    def lint_require_type_field(self) -> LinterReturn:
        """
        T009: require fields should have type field
        """

        self.node.get('require')
        missing_type = []

        # Check require items have type field
        complex_dependencies = [
            dependency
            for dependency in self.node.get('require', [])
            if isinstance(dependency, dict)
        ]

        if not complex_dependencies:
            yield LinterOutcome.SKIP, 'library/file requirements not used'
            return

        missing_type = [
            dependency for dependency in complex_dependencies if not dependency.get('type')
        ]

        if not missing_type:
            yield LinterOutcome.PASS, 'all library/file requirements specify type'
            return

        if not self.opt('fix'):
            yield LinterOutcome.FAIL, 'library/file requirements should specify type'
            return

        filename = self.fmf_sources[-1]
        # `rt` mode is needed here to preserve comments during the `fix`, see comment in `_yaml`
        metadata: dict[str, Any] = tmt.utils.yaml_to_dict(self.read(filename), yaml_type="rt")

        if not tmt.utils.is_key_origin(self.node, 'require') and all(
            dependency in metadata.get('require', []) for dependency in missing_type
        ):
            yield (
                LinterOutcome.FAIL,
                (
                    'some library/file requirement are missing type, '
                    'but inherited from test parent, please, fix manually'
                ),
            )
            return

        for dependency in metadata.get('require', []):
            if not isinstance(dependency, dict) or dependency.get('type'):
                continue

            dependency['type'] = 'file' if dependency.get('pattern') else 'library'

        self.write(filename, to_yaml(metadata, yaml_type="rt"))

        yield LinterOutcome.FIXED, 'added type to requirements'


@container(repr=False)
class LintableCollection(tmt.lint.Lintable['LintableCollection']):
    """
    Linting rules applied to a collection of Tests, Plans or Stories
    """

    def __init__(self, objs: list["Core"], *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.objs = objs

    def lint_no_duplicate_ids(self) -> LinterReturn:
        """
        G001: no duplicate ids
        """
        ids: collections.defaultdict[str, list[str]] = collections.defaultdict(list)
        for obj in self.objs:
            if obj.id is None:
                continue
            ids[obj.id].append(obj.name)

        duplicates = {obj_id: obj_names for obj_id, obj_names in ids.items() if len(obj_names) > 1}

        for obj_id, obj_names in duplicates.items():
            for name in obj_names:
                yield LinterOutcome.FAIL, f'duplicate id "{obj_id}" in "{name}"'

        if duplicates:
            return

        yield LinterOutcome.PASS, 'no duplicate ids detected'

    def print_header(self) -> None:
        echo(style("Lint checks on all", fg='red'))


def expand_node_data(data: T, fmf_context: FmfContext) -> T:
    """
    Recursively expand variables in node data
    """
    if isinstance(data, str):
        # Expand environment and context variables. This is a bit
        # tricky as we do need to process each type individually and
        # also properly handle variable/context name conflicts and
        # situations when some variable is now known.

        # First split data per $ which to avoid conflicts.
        split_data = data.split('$')

        # Don't process the first item as that was not a variable.
        first_item = split_data.pop(0)

        # Do the environment variable expansion for items not
        # starting with @. Each item starting with @ is marked
        # to be expanded later - we cannot blindly check whether
        # the item starts with @ to prevent expansion of already
        # expanded items later. Therefore we store a flag and the
        # item - if the flag is `True`, item has been expanded
        # already. We also store the original item - in case we
        # don't expand it at all, let's put the original value back
        # into the stream.
        # See https://github.com/teemtee/tmt/issues/2654.
        #
        # TID251: `pathlib` does not provide `os.patch.expandvars`, its
        # use is allowed.
        expanded_env: list[tuple[bool, str, str]] = [
            (False, item[1:], item)
            if item.startswith('@')
            else (True, os.path.expandvars(f'${item}'), item)  # noqa: TID251
            for item in split_data
        ]

        # Expand unexpanded items, this time with fmf context providing
        # additional values. This should resolve items starting with $@,
        # which are to be referencing fmf dimensions rather than environment
        # variables.
        expanded_ctx = [first_item]
        with Environment.from_fmf_context(fmf_context).as_environ():
            for was_expanded, item, original_item in expanded_env:
                if was_expanded:
                    expanded_ctx.append(item)
                    continue
                # TID251: `pathlib` does not provide `os.patch.expandvars`, its
                # use is allowed.
                expanded = os.path.expandvars(f'${item}')  # noqa: TID251
                expanded_ctx.append(f'${original_item}' if expanded.startswith('$') else expanded)

        # This cast is tricky: we get a string, and we return a
        # string, so T -> T hold, yet mypy does not recognize this,
        # and we need to help with an explicit cast().
        return cast(T, ''.join(expanded_ctx))

    if isinstance(data, dict):
        for key, value in data.items():
            data[key] = expand_node_data(value, fmf_context)
    elif isinstance(data, list):
        for i, item in enumerate(data):
            data[i] = expand_node_data(item, fmf_context)
    return data


class StoryPriority(enum.Enum):
    MUST_HAVE = 'must have'
    SHOULD_HAVE = 'should have'
    COULD_HAVE = 'could have'
    WILL_NOT_HAVE = 'will not have'

    # We need a custom "to string" conversion to support fmf's filter().
    def __str__(self) -> str:
        return self.value


@container(repr=False)
class Story(
    Core,
    tmt.export.Exportable['Story'],
    tmt.lint.Lintable['Story'],
):
    """
    User story object
    """

    example: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    # TODO: `story` is mandatory, but it's defined after attributes with default
    # values. Try to find a way how to drop the need for a dummy default.
    story: Optional[str] = None
    title: Optional[str] = None
    priority: Optional[StoryPriority] = field(
        default=cast(Optional['StoryPriority'], None),
        normalize=lambda key_address, raw_value, logger: (
            None if raw_value is None else StoryPriority(raw_value)
        ),
        exporter=lambda value: value.value if value is not None else None,
    )

    _KEYS_SHOW_ORDER = [
        'summary',
        'title',
        'story',
        'id',
        'priority',
        'description',
        'author',
        'contact',
        'example',
        'enabled',
        'order',
        'tag',
        'tier',
        'link',
    ]

    def __init__(
        self,
        *,
        node: fmf.Tree,
        tree: Optional['Tree'] = None,
        skip_validation: bool = False,
        raise_on_validation_error: bool = False,
        logger: tmt.log.Logger,
        **kwargs: Any,
    ) -> None:
        """
        Initialize the story
        """
        super().__init__(
            node=node,
            tree=tree,
            logger=logger,
            skip_validation=skip_validation,
            raise_on_validation_error=raise_on_validation_error,
            **kwargs,
        )
        self._update_metadata()

    # Override the parent implementation - it would try to call `Tree.storys()`...
    @classmethod
    def from_tree(cls, tree: 'tmt.Tree') -> list['Story']:
        return tree.stories()

    @property
    def documented(self) -> list['Link']:
        """
        Return links to relevant documentation
        """
        return self.link.get('documented-by') if self.link else []

    @property
    def verified(self) -> list['Link']:
        """
        Return links to relevant test coverage
        """
        return self.link.get('verified-by') if self.link else []

    @property
    def implemented(self) -> list['Link']:
        """
        Return links to relevant source code
        """
        return self.link.get('implemented-by') if self.link else []

    @property
    def status(self) -> list[str]:
        """
        Aggregate story status from implemented-, verified- and documented-by links
        """
        status = []

        if self.implemented:
            status.append('implemented')

        if self.verified:
            status.append('verified')

        if self.documented:
            status.append('documented')

        return status

    def _match(
        self,
        implemented: bool,
        verified: bool,
        documented: bool,
        covered: bool,
        unimplemented: bool,
        unverified: bool,
        undocumented: bool,
        uncovered: bool,
    ) -> bool:
        """
        Return true if story matches given conditions
        """
        if implemented and not self.implemented:
            return False
        if verified and not self.verified:
            return False
        if documented and not self.documented:
            return False
        if unimplemented and self.implemented:
            return False
        if unverified and self.verified:
            return False
        if undocumented and self.documented:
            return False
        if uncovered and (self.implemented and self.verified and self.documented):
            return False
        if covered and not (self.implemented and self.verified and self.documented):
            return False
        return True

    @staticmethod
    def create(
        *,
        names: list[str],
        template: str,
        path: Path,
        force: bool = False,
        dry: Optional[bool] = None,
        logger: tmt.log.Logger,
    ) -> None:
        """
        Create a new story
        """
        if dry is None:
            dry = Story._opt('dry')

        # Get story template
        if tmt.utils.is_url(template):
            story_content = tmt.templates.MANAGER.render_from_url(template, logger)
        else:
            story_templates = tmt.templates.MANAGER.templates['story']
            try:
                story_content = tmt.templates.MANAGER.render_file(story_templates[template])
            except KeyError as error:
                raise tmt.utils.GeneralError(f"Invalid template '{template}'.") from error

        # Append link with appropriate relation
        links = Links(data=list(cast(list[_RawLink], Story._opt('link', []))))
        if links:  # Output 'links' if and only if it is not empty
            story_content += to_yaml({'link': links.to_spec()})

        for story_name in names:
            story_path = path / Path(story_name).unrooted()

            if story_path.suffix != '.fmf':
                story_path = story_path.parent / f'{story_path.name}.fmf'

            # Create directory & story
            tmt.utils.create_directory(
                path=story_path.parent, name='story directory', dry=dry, logger=logger
            )

            tmt.utils.create_file(
                path=story_path,
                name='story',
                content=story_content,
                dry=dry,
                force=force,
                logger=logger,
            )

            if links.get('verifies') and dry is False:
                stories = Tree(path=path, logger=logger).stories(
                    names=[f"^{story_name}$"], apply_command_line=False
                )
                tmt.utils.jira.link(tmt_objects=stories, links=links, logger=logger)

    @staticmethod
    def overview(tree: 'Tree') -> None:
        """
        Show overview of available stories
        """
        stories = [style(str(story), fg='red') for story in tree.stories()]
        echo(
            style(
                'Found {}{}{}.'.format(
                    listed(stories, 'story'), ': ' if stories else '', listed(stories, max=12)
                ),
                fg='blue',
            )
        )

    def show(self) -> None:
        """
        Show story details
        """
        self.ls()
        for key in self._KEYS_SHOW_ORDER:
            _, _, value, _, metadata = container_field(self, key)

            if metadata.internal:
                continue

            if key == 'link':
                value.show()
                continue
            if key == 'priority' and value is not None:
                value = cast(StoryPriority, value).value
            if key == 'order' and value == DEFAULT_ORDER:
                continue
            if key == 'example' and value:
                echo(tmt.utils.format(key, value, wrap=False))
                continue
            if value is not None and value != []:
                wrap: tmt.utils.FormatWrap = False if key == 'example' else 'auto'
                echo(tmt.utils.format(key, value, wrap=wrap))
        if self.verbosity_level:
            self._show_additional_keys()

    def coverage(self, code: bool, test: bool, docs: bool) -> tuple[bool, bool, bool]:
        """
        Show story coverage
        """
        if code:
            code = bool(self.implemented)
            verdict(code, good='done ', bad='todo ', nl=False)
        if test:
            test = bool(self.verified)
            verdict(test, good='done ', bad='todo ', nl=False)
        if docs:
            docs = bool(self.documented)
            verdict(docs, good='done ', bad='todo ', nl=False)
        echo(self)
        return (code, test, docs)

    # FIXME - Make additional attributes configurable
    def lint_unknown_keys(self) -> LinterReturn:
        """
        S001: all keys are known
        """

        invalid_keys = self._lint_keys(EXTRA_STORY_KEYS)

        if invalid_keys:
            for key in invalid_keys:
                yield LinterOutcome.FAIL, f'unknown key "{key}" is used'

            return

        yield LinterOutcome.PASS, 'correct keys are used'

    def lint_story(self) -> LinterReturn:
        """
        S002: story key must be defined
        """

        if not self.node.get('story'):
            yield LinterOutcome.FAIL, 'story is required'
            return

        yield LinterOutcome.PASS, 'story key is defined'


Test.discover_linters()
Story.discover_linters()
LintableCollection.discover_linters()


class Tree(tmt.utils.Common):
    """
    Test Metadata Tree
    """

    def __init__(
        self,
        *,
        path: Optional[Path] = None,
        tree: Optional[fmf.Tree] = None,
        fmf_context: Optional[FmfContext] = None,
        additional_rules: Optional[list[_RawAdjustRule]] = None,
        logger: tmt.log.Logger,
    ) -> None:
        """
        Initialize tmt tree from directory path or given fmf tree
        """

        # TODO: not calling parent __init__ on purpose?
        self.inject_logger(logger)

        self._path = path or Path.cwd()
        self._tree = tree
        self._custom_fmf_context = fmf_context or FmfContext()
        self._additional_rules = additional_rules

    @classmethod
    def grow(
        cls,
        *,
        path: Optional[Path] = None,
        tree: Optional[fmf.Tree] = None,
        fmf_context: Optional[FmfContext] = None,
        logger: Optional[tmt.log.Logger] = None,
    ) -> 'Tree':
        """
        Initialize tmt tree from directory path or given fmf tree.

        This method serves as an entry point for interactive use of
        tmt-as-a-library, providing sane defaults.

        .. warning::

           This method has a **very** limited use case, i.e. to help
           bootstrapping interactive tmt sessions. Using it anywhere
           outside of this scope should be ruled out.
        """

        import tmt.plugins

        logger = logger or tmt.log.Logger.create()

        tmt.plugins.explore(logger)

        return Tree(
            path=path or Path.cwd(),
            tree=tree,
            fmf_context=fmf_context,
            logger=logger or tmt.log.Logger.create(),
        )

    @property
    def fmf_context(self) -> FmfContext:
        """
        Use custom fmf context if provided, default otherwise
        """
        return self._custom_fmf_context or super().fmf_context

    def _filters_conditions(
        self,
        nodes: Sequence[CoreT],
        filters: list[str],
        conditions: list[str],
        links: list['LinkNeedle'],
        includes: list[str],
        excludes: list[str],
    ) -> list[CoreT]:
        """
        Apply filters and conditions, return pruned nodes
        """

        result = []
        for node in nodes:
            filter_vars = copy.deepcopy(node._metadata)
            cond_vars = node._metadata

            # Add a lowercase version of bool variables for filtering
            bool_vars = {
                key: [value, str(value).lower()]
                for key, value in filter_vars.items()
                if isinstance(value, bool)
            }
            filter_vars.update(bool_vars)

            # Conditions
            try:
                if not all(
                    fmf.utils.evaluate(condition, cond_vars, node) for condition in conditions
                ):
                    continue
            except fmf.utils.FilterError:
                # Handle missing attributes as if condition failed
                continue
            except Exception as error:
                raise tmt.utils.GeneralError("Invalid --condition raised exception.") from error

            # Filters
            try:
                if not all(
                    fmf.utils.filter(filter_, filter_vars, regexp=True) for filter_ in filters
                ):
                    continue
            except fmf.utils.FilterError:
                # Handle missing attributes as if filter failed
                continue

            # Links
            try:
                # Links are in OR relation
                if links and all(not node.has_link(needle) for needle in links):
                    continue
            except Exception as exc:
                # Handle broken link as not matching
                self.debug(f'Invalid link ignored, exception was {exc}')
                continue

            # Exclude
            if any(re.search(pattern, node.name) for pattern in excludes):
                continue

            # Include
            if includes and not any(re.search(pattern, node.name) for pattern in includes):
                continue

            result.append(node)

        return result

    def sanitize_cli_names(self, names: list[str]) -> list[str]:
        """
        Sanitize CLI names in case name includes control character
        """
        for name in names:
            if not name.isprintable():
                raise tmt.utils.GeneralError(f"Invalid name {name!r} as it's not printable.")
        return names

    @property
    def tree(self) -> fmf.Tree:
        """
        Initialize tree only when accessed
        """
        if self._tree is None:
            try:
                self._tree = fmf.Tree(str(self._path))
            except fmf.utils.RootError as error:
                raise tmt.utils.MetadataError(
                    f"No metadata found in the '{self._path}' directory. "
                    f"Use 'tmt init' to get started."
                ) from error
            except fmf.utils.FileError as error:
                raise tmt.utils.GeneralError("Invalid yaml syntax.") from error
            # Adjust metadata for current fmf context
            self._tree.adjust(
                fmf.context.Context(**self.fmf_context),
                case_sensitive=False,
                decision_callback=create_adjust_callback(self._logger),
                additional_rules=self._additional_rules,
            )
        return self._tree

    @tree.setter
    def tree(self, new_tree: fmf.Tree) -> None:
        self._tree = new_tree

    @property
    def root(self) -> Optional[Path]:
        """
        Metadata root
        """
        if self.tree is None or self.tree.root is None:
            return None

        return Path(self.tree.root)

    def tests(
        self,
        logger: Optional[tmt.log.Logger] = None,
        keys: Optional[list[str]] = None,
        names: Optional[list[str]] = None,
        filters: Optional[list[str]] = None,
        conditions: Optional[list[str]] = None,
        unique: bool = True,
        links: Optional[list['LinkNeedle']] = None,
        includes: Optional[list[str]] = None,
        excludes: Optional[list[str]] = None,
        apply_command_line: bool = True,
        sort: bool = True,
    ) -> list[Test]:
        """
        Search available tests
        """
        # Handle defaults, apply possible command line options
        logger = logger or self._logger
        keys = (keys or []) + ['test']
        names = names or []
        filters = filters or []
        conditions = conditions or []
        # FIXME: cast() - typeless "dispatcher" method
        links = (links or []) + [
            LinkNeedle.from_spec(value) for value in cast(list[str], Test._opt('links', []))
        ]
        includes = includes or []
        excludes = excludes or []
        # Used in: tmt run test --name NAME, tmt test ls NAME...
        cmd_line_names: list[str] = []

        if apply_command_line:
            filters += list(Test._opt('filters', []))
            conditions += list(Test._opt('conditions', []))
            includes += list(Test._opt('include', []))
            excludes += list(Test._opt('exclude', []))
            cmd_line_names = list(Test._opt('names', []))

        # Sanitize test names to make sure no name includes control character
        cmd_line_names = self.sanitize_cli_names(cmd_line_names)

        def name_filter(nodes: Iterable[fmf.Tree]) -> list[fmf.Tree]:
            """
            Filter nodes based on names provided on the command line
            """
            if not cmd_line_names:
                return list(nodes)
            return [
                node
                for node in nodes
                if any(re.search(name, node.name) for name in cmd_line_names)
            ]

        # Append post filter to support option --enabled or --disabled
        if Test._opt('enabled'):
            filters.append('enabled:true')
        if Test._opt('disabled'):
            filters.append('enabled:false')

        # As the first step, let's build the list of test objects based
        # on keys and names/sources

        # Pick tests based on the source files names
        if Test._opt('source'):
            tests = [
                Test(node=test, logger=self._logger.descend())
                for test in self.tree.prune(keys=keys, sources=cmd_line_names, sort=sort)
            ]

        # If duplicate test names are allowed, match test name/regexp
        # one-by-one and preserve the order of tests within a plan.
        elif not unique and names:
            tests = []
            for name in names:
                selected_tests = [
                    Test(
                        node=test,
                        tree=self,
                        logger=logger.descend(
                            logger_name=test.get('name', None)
                        ),  # .apply_verbosity_options(**self._options),
                    )
                    for test in name_filter(self.tree.prune(keys=keys, names=[name], sort=sort))
                ]
                tests.extend(sorted(selected_tests, key=lambda test: test.order))

        # Otherwise just perform a regular key/name filtering
        else:
            selected_tests = [
                Test(
                    node=test,
                    tree=self,
                    logger=logger.descend(
                        logger_name=test.get('name', None)
                    ),  # .apply_verbosity_options(**self._options),
                )
                for test in name_filter(self.tree.prune(keys=keys, names=names, sort=sort))
            ]
            tests = sorted(selected_tests, key=lambda test: test.order)

        # Apply filters & conditions
        return self._filters_conditions(
            nodes=tests,
            filters=filters,
            conditions=conditions,
            links=links,
            includes=includes,
            excludes=excludes,
        )

    def plans(
        self,
        logger: Optional[tmt.log.Logger] = None,
        keys: Optional[list[str]] = None,
        names: Optional[list[str]] = None,
        filters: Optional[list[str]] = None,
        conditions: Optional[list[str]] = None,
        run: Optional['Run'] = None,
        links: Optional[list['LinkNeedle']] = None,
        excludes: Optional[list[str]] = None,
        apply_command_line: bool = True,
        resolve_enabled_only: bool = False,
    ) -> list["Plan"]:
        """
        Search available plans
        """
        from tmt.base.plan import Plan

        # Handle defaults, apply possible command line options
        logger = logger or (run._logger if run is not None else self._logger)
        local_plan_keys = (keys or []) + ['execute']
        remote_plan_keys = (keys or []) + ['plan']
        names = names or []
        filters = filters or []
        conditions = conditions or []
        # FIXME: cast() - typeless "dispatcher" method
        links = (links or []) + [
            LinkNeedle.from_spec(value) for value in cast(list[str], Plan._opt('links', []))
        ]
        excludes = excludes or []

        if apply_command_line:
            names += list(Plan._opt('names', []))
            filters += list(Plan._opt('filters', []))
            conditions += list(Plan._opt('conditions', []))
            excludes += list(Plan._opt('exclude', []))

        # Sanitize plan names to make sure no name includes control character
        names = self.sanitize_cli_names(names)

        # Append post filter to support option --enabled or --disabled
        if Plan._opt('enabled'):
            filters.append('enabled:true')
        if Plan._opt('disabled'):
            filters.append('enabled:false')

        # For --source option use names as sources
        if Plan._opt('source'):
            sources = names
            names = None
        else:
            sources = None

        # Build the list, convert to objects, sort and filter
        local_plans = list(self.tree.prune(keys=local_plan_keys, names=names, sources=sources))
        importing_plans = list(
            self.tree.prune(
                keys=remote_plan_keys,
                names=None if self.import_before_name_filter else names,
                sources=sources,
            )
        )

        for plan in importing_plans:
            if plan in local_plans:
                logger.warning(
                    f"Plan '{plan}' defines both 'execute' and 'import',"
                    " ignoring the 'execute' step."
                )

                local_plans.remove(plan)

        plans = [
            Plan(
                node=plan,
                tree=self,
                logger=logger.descend(
                    logger_name=plan.get('name', None), extra_shift=0
                ).apply_verbosity_options(cli_invocation=Plan.cli_invocation),
                run=run,
            )
            for plan in [*local_plans, *importing_plans]
        ]

        if run is not None:
            for policy in run.policies:
                policy.apply_to_plans(plans=plans, logger=logger)

        if Plan._opt('enabled') or ('enabled:true' in filters):
            resolve_enabled_only = True

        if not Plan._opt('shallow'):
            unresolved_plans = plans
            plans = []
            for plan in unresolved_plans:
                # Do not resolve disabled plans unless forced to
                if resolve_enabled_only and not plan.enabled:
                    plan.debug(
                        f"Plan '{plan.name}' is not enabled, skipping imports resolution.",
                    )
                    continue
                try:
                    plans += plan.resolve_imports()
                except Exception as error:
                    if self.import_before_name_filter:
                        # If we filter later, we can skip some resolve failures
                        # since it may be unrelated
                        tmt.utils.show_exception_as_warning(
                            message=f"Failed to import plan '{plan.name}'",
                            exception=error,
                            logger=logger,
                        )
                    else:
                        # Otherwise the filter was already applied and the resolve failure
                        # is an error
                        raise

        # Do the name filter after the import
        if self.import_before_name_filter and names:
            plans = [plan for plan in plans if any(re.search(name, plan.name) for name in names)]

        return self._filters_conditions(
            nodes=sorted(plans, key=lambda plan: plan.order),
            filters=filters,
            conditions=conditions,
            links=links,
            includes=[],
            excludes=excludes,
        )

    def stories(
        self,
        logger: Optional[tmt.log.Logger] = None,
        keys: Optional[list[str]] = None,
        names: Optional[list[str]] = None,
        filters: Optional[list[str]] = None,
        conditions: Optional[list[str]] = None,
        whole: bool = False,
        links: Optional[list['LinkNeedle']] = None,
        excludes: Optional[list[str]] = None,
        apply_command_line: Optional[bool] = True,
    ) -> list[Story]:
        """
        Search available stories
        """
        # Handle defaults, apply possible command line options
        logger = logger or self._logger
        keys = (keys or []) + ['story']
        names = names or []
        filters = filters or []
        conditions = conditions or []
        # FIXME: cast() - typeless "dispatcher" method
        links = (links or []) + [
            LinkNeedle.from_spec(value) for value in cast(list[str], Story._opt('links', []))
        ]
        excludes = excludes or []

        if apply_command_line:
            names += list(Story._opt('names', []))
            filters += list(Story._opt('filters', []))
            conditions += list(Story._opt('conditions', []))
            excludes += list(Story._opt('exclude', []))

        # Sanitize story names to make sure no name includes control character
        names = self.sanitize_cli_names(names)

        # Append post filter to support option --enabled or --disabled
        if Story._opt('enabled'):
            filters.append('enabled:true')
        if Story._opt('disabled'):
            filters.append('enabled:false')

        # For --source option use names as sources
        if Story._opt('source'):
            sources = names
            names = None
        else:
            sources = None

        # Build the list, convert to objects, sort and filter
        stories = [
            Story(node=story, tree=self, logger=logger.descend())
            for story in self.tree.prune(keys=keys, names=names, whole=whole, sources=sources)
        ]
        return self._filters_conditions(
            nodes=sorted(stories, key=lambda story: story.order),
            filters=filters,
            conditions=conditions,
            links=links,
            includes=[],
            excludes=excludes,
        )

    @staticmethod
    def init(*, path: Path, template: str, force: bool, logger: tmt.log.Logger) -> None:
        """
        Initialize a new tmt tree, optionally with a template
        """
        from tmt.base.plan import Plan

        path = path.resolve()
        dry = Tree._opt('dry')

        # Check for existing tree
        tree: Optional[Tree] = None

        try:
            tree = Tree(logger=logger, path=path)
            # Are we creating a new tree under the existing one?
            assert tree is not None  # narrow type
            if path == tree.root:
                echo(f"The fmf tree root '{tree.root}/.fmf' already exists.")
            else:
                # Are we creating a nested tree?
                echo(f"Path '{path}' already has a parent fmf tree root '{tree.root}/.fmf'.")
                if not force and not confirm("Do you really want to initialize a nested tree?"):
                    return
                tree = None
        except tmt.utils.GeneralError:
            tree = None

        # Create a new tree
        fmf_dir = path / '.fmf'
        if tree is None:
            if dry:
                echo(f"Would initialize the fmf tree root '{fmf_dir}'.")
            else:
                try:
                    fmf.Tree.init(path)
                    tree = Tree(logger=logger, path=path)
                    assert tree.root is not None  # narrow type
                    path = tree.root
                except fmf.utils.GeneralError as error:
                    raise tmt.utils.GeneralError(
                        f"Failed to initialize the fmf tree root '{fmf_dir}'."
                    ) from error
                echo(f"Initialized the fmf tree root '{fmf_dir}'.")

        # Add .fmf directory to the git index if possible
        if tmt.utils.git.git_root(fmf_root=path, logger=logger):
            if dry:
                echo(f"Path '{fmf_dir}' would be added to git index.")
            else:
                try:
                    tmt.utils.git.git_add(path=fmf_dir, logger=logger)
                    echo(f"Path '{fmf_dir}' added to git index.")
                except tmt.utils.GeneralError as error:
                    message = error.message
                    if isinstance(error.__cause__, tmt.utils.RunError) and error.__cause__.stderr:
                        message += " " + " ".join(error.__cause__.stderr.splitlines())
                    echo(message)

        # Populate the tree with example objects if requested
        if template == 'empty':
            choices = listed(tmt.templates.INIT_TEMPLATES, join='or', quote="'")
            echo(f"Use 'tmt init --template' with {choices} to create example content.")
            echo(
                "Add tests, plans or stories with 'tmt test create', "
                "'tmt plan create' or 'tmt story create'."
            )
        else:
            echo(f"Applying template '{template}'.")

        if template == 'mini':
            Plan.create(
                names=['/plans/example'],
                template='mini',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )
        elif template == 'base':
            tmt.Test.create(
                names=['/tests/example'],
                template='beakerlib',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )
            Plan.create(
                names=['/plans/example'],
                template='base',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )
        elif template == 'full':
            tmt.Test.create(
                names=['/tests/example'],
                template='shell',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )
            Plan.create(
                names=['/plans/example'],
                template='full',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )
            tmt.Story.create(
                names=['/stories/example'],
                template='full',
                path=path,
                force=force,
                dry=dry,
                logger=logger,
            )


class Status(tmt.utils.Common):
    """
    Status of tmt work directories.
    """

    LONGEST_STEP = max(tmt.steps.STEPS, key=lambda k: len(k))
    FIRST_COL_LEN = len(LONGEST_STEP) + 2

    @staticmethod
    def get_overall_plan_status(
        plan: "Plan",
    ) -> Union[Literal["done", "todo"], tmt.steps.StepName]:
        """
        Examines the plan status (find the last done step)
        """
        steps = list(plan.steps())
        step_names = list(plan.step_names())
        for i in range(len(steps) - 1, -1, -1):
            if steps[i].status() == 'done':
                if i + 1 == len(steps):
                    # Last enabled step, consider the whole plan done
                    return 'done'
                return step_names[i]
        return 'todo'

    def plan_matches_filters(self, plan: "Plan") -> bool:
        """
        Check if the given plan matches filters from the command line
        """
        if self.opt('abandoned'):
            return plan.provision.status() == 'done' and plan.cleanup.status() == 'todo'
        if self.opt('active'):
            return any(step.status() == 'todo' for step in plan.steps())
        if self.opt('finished'):
            return all(step.status() == 'done' for step in plan.steps())
        return True

    @staticmethod
    def colorize_column(content: str) -> str:
        """
        Add color to a status column
        """
        if 'done' in content:
            return style(content, fg='green')
        return style(content, fg='yellow')

    @classmethod
    def pad_with_spaces(cls, string: str) -> str:
        """
        Append spaces to string to properly align the first column
        """
        return string + (cls.FIRST_COL_LEN - len(string)) * ' '

    def run_matches_filters(self, run: 'Run') -> bool:
        """
        Check if the given run matches filters from the command line
        """
        if self.opt('abandoned') or self.opt('active'):
            # Any of the plans must be abandoned/active for the whole
            # run to be abandoned/active
            return any(self.plan_matches_filters(p) for p in run.plans)
        if self.opt('finished'):
            # All plans must be finished for the whole run to be finished
            return all(self.plan_matches_filters(p) for p in run.plans)
        return True

    def print_run_status(self, run: 'Run') -> None:
        """
        Display the overall status of the run
        """
        if not self.run_matches_filters(run):
            return
        # Find the earliest step in all plans' status
        earliest_step_index = len(tmt.steps.STEPS)
        for plan in run.plans:
            plan_status = self.get_overall_plan_status(plan)
            if plan_status == 'done':
                continue
            if plan_status == 'todo':
                # If plan has no steps done, consider the whole run not done
                earliest_step_index = -1
                break
            plan_status_index = tmt.steps.STEPS.index(plan_status)
            earliest_step_index = min(plan_status_index, earliest_step_index)

        if earliest_step_index == len(tmt.steps.STEPS):
            run_status = 'done'
        elif earliest_step_index == -1:
            run_status = 'todo'
        else:
            run_status = tmt.steps.STEPS[earliest_step_index]
        run_status = self.colorize_column(self.pad_with_spaces(run_status))
        echo(run_status, nl=False)
        echo(run.run_workdir)

    def print_plans_status(self, run: 'Run') -> None:
        """
        Display the status of each plan of the given run
        """
        for plan in run.plans:
            if self.plan_matches_filters(plan):
                plan_status = self.get_overall_plan_status(plan)
                echo(self.colorize_column(self.pad_with_spaces(plan_status)), nl=False)
                echo(f'{run.run_workdir}  {plan.name}')

    def print_verbose_status(self, run: 'Run') -> None:
        """
        Display the status of each step of the given run
        """
        for plan in run.plans:
            if self.plan_matches_filters(plan):
                for step in plan.steps(enabled_only=False):
                    column = (step.status() or '----') + ' '
                    echo(self.colorize_column(column), nl=False)
                echo(f' {run.run_workdir}  {plan.name}')

    def process_run(self, run: 'Run') -> None:
        """
        Display the status of the given run based on verbosity
        """
        loaded, error = tmt.utils.load_run(run)
        if not loaded:
            self.warn(f"Failed to load run '{run.run_workdir}': {error}")
            return
        if self.verbosity_level == 0:
            self.print_run_status(run)
        elif self.verbosity_level == 1:
            self.print_plans_status(run)
        else:
            self.print_verbose_status(run)

    def print_header(self) -> None:
        """
        Print the header of the status table based on verbosity
        """
        header = ''
        if self.verbosity_level >= 2:
            for step in tmt.steps.STEPS:
                header += step[0:4] + ' '
            header += ' '
        else:
            header = self.pad_with_spaces('status')
        header += 'id'
        echo(style(header, fg='blue'))

    def show(self) -> None:
        """
        Display the current status
        """

        # TODO: avoid circular import - hopefully this would disappear
        # once we move `Status` into its own module.
        from tmt.base.run import Run

        # Prepare absolute workdir path if --id was used
        root_path = Path(self.workdir_root)
        self.print_header()
        assert self._cli_context_object is not None  # narrow type
        assert self._cli_context_object.tree is not None  # narrow type
        for abs_path in tmt.utils.generate_runs(root_path, self.opt('id')):
            run = Run(
                logger=self._logger,
                id_=abs_path,
                tree=self._cli_context_object.tree,
                cli_invocation=self.cli_invocation,
            )
            self.process_run(run)


CleanCallback = Callable[[], bool]


def _dir_size(path: Path) -> 'Quantity':
    """Return the total size in bytes of all files under path."""
    return tmt.hardware.UNITS(f'{sum(f.lstat().st_size for f in path.rglob("*"))} bytes')


class Clean(tmt.utils.Common):
    """
    A class for cleaning up workdirs, guests or images
    """

    def __init__(
        self,
        *,
        parent: Optional[tmt.utils.Common] = None,
        name: Optional[str] = None,
        workdir: tmt.utils.WorkdirArgumentType = None,
        workdir_root: Optional[Path] = None,
        cli_invocation: Optional['tmt.cli.CliInvocation'] = None,
        logger: tmt.log.Logger,
    ) -> None:
        """
        Initialize name and relation with the parent object

        Always skip to initialize the work tree.
        """

        # Set the option to skip to initialize the work tree
        if cli_invocation and cli_invocation.context:
            cli_invocation.context.params[tmt.utils.PLAN_SKIP_WORKTREE_INIT] = True

        super().__init__(
            logger=logger,
            parent=parent,
            name=name,
            workdir=workdir,
            workdir_root=workdir_root,
            cli_invocation=cli_invocation,
        )

    def images(self) -> bool:
        """
        Clean images of provision plugins
        """
        self.info('images', color='blue')
        successful = True
        for method in tmt.steps.provision.ProvisionPlugin.methods():
            if not method.class_.clean_images(  # type: ignore[attr-defined]
                self, self.is_dry_run, self.workdir_root
            ):
                successful = False
        return successful

    def _matches_how(self, plan: "Plan") -> bool:
        """
        Check if the given plan matches options
        """
        how = plan.provision.data[0].how
        # FIXME: cast() - typeless "dispatcher" method
        target_how = cast(Optional[str], self.opt('how'))
        if target_how:
            return how == target_how
        # No target provision method, always matches
        return True

    def _stop_running_guests(self, run: 'Run') -> bool:
        """
        Stop all running guests of a run
        """
        loaded, error = tmt.utils.load_run(run)
        if not loaded:
            self.warn(f"Failed to load run '{run.run_workdir}': {error}")
            return False
        # Clean guests if provision is done but cleanup is not done
        successful = True
        for plan in run.plans:
            if plan.provision.status() == 'done' and plan.cleanup.status() != 'done':
                # Wake up provision to load the active guests
                plan.provision.wake()
                if not self._matches_how(plan):
                    continue
                if self.is_dry_run:
                    self.verbose(
                        f"Would stop guests in run '{run.run_workdir}' plan '{plan.name}'.",
                        shift=1,
                    )
                else:
                    self.verbose(
                        f"Stopping guests in run '{run.run_workdir}' plan '{plan.name}'.", shift=1
                    )
                    # Set --quiet to avoid finish logging to terminal

                    assert self.cli_invocation is not None  # narrow type

                    quiet = self.cli_invocation.options['quiet']
                    self.cli_invocation.options['quiet'] = True
                    try:
                        plan.cleanup.go()
                    except tmt.utils.GeneralError as error:
                        self.warn(
                            f"Could not stop guest in run '{run.run_workdir}': {error}.", shift=1
                        )
                        successful = False
                    finally:
                        self.cli_invocation.options['quiet'] = quiet
        return successful

    def guests(self, run_ids: tuple[str, ...], keep: Optional[int]) -> bool:
        """
        Clean guests of runs
        """
        self.info('guests', color='blue')
        self.verbose('workdir root', self.workdir_root)
        if self.opt('last'):
            # TODO: avoid circular import - hopefully this would disappear
            # once we split `Clean` into plugins
            from tmt.base.run import Run

            # Pass the context containing --last to Run to choose
            # the correct one.
            return self._stop_running_guests(
                Run(
                    logger=self._logger,
                    cli_invocation=self.cli_invocation,
                    workdir_root=self.workdir_root,
                )
            )
        successful = True
        assert self._cli_context_object is not None  # narrow type
        all_workdirs = list(tmt.utils.generate_runs(self.workdir_root, run_ids))
        if keep is not None:
            # Sort by modify time of the workdirs to keep the newest guests
            all_workdirs.sort(
                key=lambda workdir: Path(workdir / 'run.yaml').stat().st_mtime, reverse=True
            )
            all_workdirs = all_workdirs[keep:]

        # TODO: avoid circular import - hopefully this would disappear
        # once we split `Clean` into plugins
        from tmt.base.run import Run

        for abs_path in all_workdirs:
            run = Run(
                logger=self._logger,
                id_=abs_path,
                tree=self._cli_context_object.tree,
                cli_invocation=self.cli_invocation,
                workdir_root=self.workdir_root,
            )
            if not self._stop_running_guests(run):
                successful = False
        return successful

    def _clean_workdir(self, path: Path) -> tuple[bool, 'Quantity']:
        """
        Remove a workdir (unless in dry mode)
        """
        size = _dir_size(path)
        formatted_size = tmt.hardware.format_compact(size)
        if self.is_dry_run:
            self.verbose(f"Would remove workdir '{path}' ({formatted_size}).", shift=1)
        else:
            self.verbose(f"Removing workdir '{path}' ({formatted_size}).", shift=1)
            try:
                shutil.rmtree(path)
            except OSError as error:
                self.warn(f"Failed to remove '{path}': {error}.", shift=1)
                return False, tmt.hardware.UNITS('0 bytes')
        return True, size

    def runs(self, id_: tuple[str, ...], keep: Optional[int]) -> bool:
        """
        Clean workdirs of runs
        """
        self.info('runs', color='blue')
        self.verbose('workdir root', self.workdir_root)
        if self.opt('last'):
            # TODO: avoid circular import - hopefully this would disappear
            # once we split `Clean` into plugins
            from tmt.base.run import Run

            # Pass the context containing --last to Run to choose
            # the correct one.
            last_run = Run(logger=self._logger, cli_invocation=self.cli_invocation)
            last_run.load_workdir(with_logfiles=False)
            successful, total_size = self._clean_workdir(last_run.run_workdir)
        else:
            all_workdirs = list(tmt.utils.generate_runs(self.workdir_root, id_, all_=True))
            if keep is not None:
                # Sort by change time of the workdirs and keep the newest workdirs
                all_workdirs.sort(key=lambda workdir: workdir.stat().st_ctime, reverse=True)
                all_workdirs = all_workdirs[keep:]

            successful = True
            total_size = tmt.hardware.UNITS('0 bytes')
            for workdir in all_workdirs:
                success, size = self._clean_workdir(workdir)
                if not success:
                    successful = False
                total_size += size  # type: ignore[misc]

        self.info(
            f"Summary: {'Would free' if self.is_dry_run else 'Freed'} "
            f"{tmt.hardware.format_compact(total_size)} "
            f"of disk space.",
            shift=1,
        )
        return successful


@container
class LinkNeedle:
    """
    A container to use for searching links.

    ``relation`` and ``target`` fields hold regular expressions that
    are to be searched for in the corresponding fields of :py:class:`Link`
    instances.
    """

    relation: str = r'.*'
    target: str = r'.*'

    @classmethod
    def from_spec(cls, value: str) -> 'LinkNeedle':
        """
        Convert from a specification file or from a CLI option

        Specification is described in [1], this constructor takes care
        of parsing it into a corresponding ``LinkNeedle`` instance.

        [1] https://tmt.readthedocs.io/en/stable/plugins/discover.html#fmf
        """

        parts = value.split(':', maxsplit=1)

        if len(parts) == 1:
            return LinkNeedle(target=parts[0])

        return LinkNeedle(relation=parts[0], target=parts[1])

    def __str__(self) -> str:
        return f'{self.relation}:{self.target}'

    def matches(self, link: 'Link') -> bool:
        """
        Find out whether a given link matches this needle
        """

        # Rule out the simple case, mismatching relation.
        if not re.search(self.relation, link.relation):
            return False

        # If the target is a string, the test is trivial.
        if isinstance(link.target, str):
            return re.search(self.target, link.target) is not None

        # If the target is an fmf id, the current basic implementation will
        # check just the `name` key, if it's defined. More fields may come
        # later, pending support for more sophisticated parsing of link
        # needle on a command line.
        if link.target.name:
            return re.search(self.target, link.target.name) is not None

        return False


@container
class Link(SpecBasedContainer[Any, _RawLinkRelation]):
    """
    An internal "link" as defined by tmt specification.

    All links, after entering tmt internals, are converted from their raw
    representation into instances of this class.

    [1] https://tmt.readthedocs.io/en/stable/spec/core.html#link
    """

    DEFAULT_RELATIONSHIP: ClassVar[_RawLinkRelationName] = 'relates'

    relation: _RawLinkRelationName
    target: Union[str, FmfId]
    note: Optional[str] = None

    @classmethod
    def from_spec(cls, spec: _RawLink) -> 'Link':
        """
        Convert from a specification file or from a CLI option

        Specification is described in [1], this constructor takes care
        of parsing it into a corresponding ``Link`` instance.

        [1] https://tmt.readthedocs.io/en/stable/spec/core.html#link
        """

        # `spec` can be either a string, fmf id, or relation:target mapping with
        # a single key (modulo `note` key, of course).

        # String is simple: if `spec` is a string, it represents a [relation:]target,
        # and we use the default relationship if relation is not specified.
        if isinstance(spec, str):
            pattern = rf'(?:(?P<relation>{"|".join(Links._relations)}):)?(?P<target>.+)'
            result = re.match(pattern, spec)
            if result is None:
                raise tmt.utils.SpecificationError(
                    f"Invalid spec '{spec}' (should be [relation:]<target>)."
                )

            relation_target_pair = result.groupdict()
            assert relation_target_pair['target'] is not None
            relation = cast(
                _RawLinkRelationName, relation_target_pair['relation'] or Link.DEFAULT_RELATIONSHIP
            )
            target = relation_target_pair['target']
            return Link(relation=relation, target=target)

        # From now on, `spec` is a mapping, and may contain the optional
        # `note` key. Extract the key for later.
        # FIXME: cast() - typeless "dispatcher" method
        note = cast(Optional[str], spec.get('note', None))

        # Count how many relations are stored in spec.
        relations = [
            cast(_RawLinkRelationName, key)
            for key in spec
            if key not in ([*FmfId.VALID_KEYS, 'note'])
        ]

        # If there are no relations, spec must be an fmf id, representing
        # a target.
        if len(relations) == 0:
            return Link(
                relation=Link.DEFAULT_RELATIONSHIP,
                target=FmfId.from_spec(cast(_RawFmfId, spec)),
                note=note,
            )

        # More relations than 1 are a hard error, only 1 is allowed.
        if len(relations) > 1:
            raise tmt.utils.SpecificationError(
                f"Multiple relations specified for the link ({fmf.utils.listed(relations)})."
            )

        # At this point, we know there's just a single relation, its value is the target,
        # and note we already put aside.
        #
        # ignore[typeddict-item]: as far as mypy knows, we did not narrow the type of `spec`,
        # _RawFmfId is still in play - but we do know it's no longer possible because such a
        # value we ruled out thanks to `"no relations" check above. At this point,
        # the right side of relation must be _RawLinkTarget and nothing else. Helping
        # mypy to realize that.
        relation = relations[0]
        raw_target = cast(_RawLinkTarget, spec[relation])  # type: ignore[typeddict-item]

        # TODO: this should not happen with mandatory validation
        if relation not in Links._relations:
            raise tmt.utils.SpecificationError(
                f"Invalid link relation '{relation}' (should be "
                f"{fmf.utils.listed(Links._relations, join='or')})."
            )

        if isinstance(raw_target, str):
            return Link(relation=relation, target=raw_target, note=note)

        return Link(relation=relation, target=FmfId.from_spec(raw_target), note=note)

    def to_spec(self) -> _RawLinkRelation:
        """
        Convert to a form suitable for saving in a specification file

        No matter what the original specification was, every link will
        generate the very same type of specification, the ``relation: target``
        one.

        Output of this method is fully compatible with specification, and when
        given to :py:meth:`from_spec`, it shall create a ``Link`` instance
        with the same properties as the original one.

        [1] https://tmt.readthedocs.io/en/stable/spec/core.html#link
        """

        spec: _RawLinkRelation = {
            self.relation: self.target.to_spec() if isinstance(self.target, FmfId) else self.target
        }

        if self.note is not None:
            spec['note'] = self.note

        return spec


class Links(SpecBasedContainer[Any, list[_RawLinkRelation]]):
    """
    Collection of links in tests, plans and stories.

    Provides abstraction over the whole collection of object's links.

    [1] https://tmt.readthedocs.io/en/stable/spec/core.html#link
    """

    # The list of all supported link relations
    _relations: list[_RawLinkRelationName] = [
        'verifies',
        'verified-by',
        'implements',
        'implemented-by',
        'documents',
        'documented-by',
        'blocks',
        'blocked-by',
        'duplicates',
        'duplicated-by',
        'parent',
        'child',
        'relates',
        'test-script',
    ]

    _links: list[Link]

    def __init__(self, *, data: Optional[_RawLinks] = None):
        """
        Create a collection from raw link data
        """

        # TODO: this should not happen with mandatory validation
        if data is not None and not isinstance(data, (str, dict, list)):
            # TODO: deliver better key address, needs to know the parent
            raise tmt.utils.NormalizationError(
                'link', data, 'a string, a fmf id or a list of their combinations'
            )

        # Nothing to do if no data provided
        if data is None:
            self._links = []

            return

        specs = data if isinstance(data, list) else [data]

        # Ensure that each link is in the canonical form
        self._links = [Link.from_spec(spec) for spec in specs]

    @classmethod
    def from_spec(cls, spec: Union[_RawLink, list[_RawLink]]) -> Self:
        return cls(data=spec)

    def to_spec(self) -> list[_RawLinkRelation]:
        """
        Convert to a form suitable for saving in a specification file

        No matter what the original specification was, every link will
        generate the very same type of specification, the ``relation: target``
        one.

        Output of this method is fully compatible with specification, and when
        used to instantiate :py:meth:`Link` object, it shall create a collection
        of links with the same properties as the original one.

        [1] https://tmt.readthedocs.io/en/stable/spec/core.html#link
        """

        return [link.to_spec() for link in self._links]

    def get(self, relation: Optional[_RawLinkRelationName] = None) -> list[Link]:
        """
        Get links with given relation, all by default
        """
        return [link for link in self._links if relation is None or link.relation == relation]

    def show(self) -> None:
        """
        Format a list of links with their relations
        """
        for link in self._links:
            # TODO: needs a format for fmf id target
            echo(
                tmt.utils.format(
                    link.relation.rstrip('-by'), f"{link.target}", key_color='cyan', wrap=False
                )
            )

    def has_link(self, needle: Optional[LinkNeedle] = None) -> bool:
        """
        Check whether this set of links contains a matching link.

        If ``needle`` is left unspecified, method would take all links into
        account, as if the ``needle`` was match all possible links (``.*:.*``).
        Method would then answer the question "are there *any* links at all?"

        :param needle: if set, only links matching ``needle`` are considered. If
            not set, method considers all present links.
        :returns: ``True`` if there are matching links, ``False`` otherwise.
        """

        if needle is None:
            return bool(self._links)

        return any(needle.matches(link) for link in self._links)

    def __bool__(self) -> bool:
        return self.has_link()


def resolve_dynamic_ref(
    *,
    workdir: Path,
    ref: Optional[str],
    plan: Optional["Plan"] = None,
    logger: tmt.log.Logger,
) -> Optional[str]:
    """
    Get the final value for the dynamic reference

    Returns original ref if the dynamic referencing isn't used.
    Plan is used for context and environment expansion to process reference.
    Common instance is used for appropriate logging.
    """
    from tmt.base.plan import Plan

    # Nothing to do if no dynamic reference provided
    if not ref or not ref.startswith("@"):
        return ref

    # Prepare path of the dynamic reference file
    ref_filepath = workdir / ref[1:]
    if not ref_filepath.exists():
        raise tmt.utils.FileError(
            f"Dynamic 'ref' definition file '{ref_filepath}' does not exist."
        )
    logger.debug(f"Dynamic 'ref' definition file '{ref_filepath}' detected.")

    # Read it, process it and get the value of the attribute 'ref'
    try:
        data: dict[str, Any] = tmt.utils.yaml_to_dict(ref_filepath.read_text(encoding='utf-8'))

    except OSError as error:
        raise tmt.utils.FileError(f"Failed to read '{ref_filepath}'.") from error
    # Build a dynamic reference tree, adjust ref based on the context
    reference_tree = fmf.Tree(data=data)
    if not plan:
        raise tmt.utils.FileError("Cannot get plan fmf context to evaluate dynamic ref.")
    reference_tree.adjust(
        fmf.context.Context(**plan.fmf_context),
        case_sensitive=False,
        decision_callback=create_adjust_callback(logger),
    )
    # Also temporarily build a plan so that env and context variables are expanded
    Plan(
        logger=logger,
        node=reference_tree,
        run=plan.my_run,
        inherited_fmf_context=plan.fmf_context,
        inherited_environment=plan.environment,
        skip_validation=True,
    )
    ref = reference_tree.get("ref")
    logger.debug(f"Dynamic 'ref' resolved as '{ref}'.")
    return ref
