import copy
import shutil
from typing import Any, Callable, Optional, TypeVar, cast

import click
import fmf

import tmt
import tmt.base.core
import tmt.checks
import tmt.log
import tmt.steps
import tmt.steps.discover
import tmt.utils
import tmt.utils.git
from tmt._compat.typing import Self
from tmt.container import (
    SerializableContainer,
    SpecBasedContainer,
    container,
    field,
    option_to_key,
)
from tmt.steps import _RawStepData
from tmt.steps.prepare.distgit import insert_to_prepare_step
from tmt.utils import (
    Command,
    Path,
    ShellScript,
)

T = TypeVar('T', bound='TestDescription')


class _RawDiscoverShellData(_RawStepData):
    tests: Optional[list[dict[str, Any]]]


@container
class TestDescription(
    SpecBasedContainer[dict[str, Any], dict[str, Any]],
    tmt.utils.NormalizeKeysMixin,
    SerializableContainer,
):
    """
    Keys necessary to describe a shell-based test.

    Provides basic functionality for transition between "raw" step data representation,
    which consists of keys and values given by fmf tree and CLI options, and this
    container representation for internal use.
    """

    name: str

    # TODO: following keys are copy & pasted from base.Test. It would be much, much better
    # to reuse the definitions from base.Test instead copying them here, but base.Test
    # does not support save/load operations. This is a known issue, introduced by a patch
    # transitioning step data to data classes, it is temporary, and it will be fixed as
    # soon as possible - nobody wants to keep two very same lists of attributes.
    test: ShellScript = field(
        default=ShellScript(''),
        normalize=lambda key_address, raw_value, logger: ShellScript(raw_value),
        serialize=lambda test: str(test),
        unserialize=lambda serialized_test: ShellScript(serialized_test),
    )

    # Core attributes (supported across all levels)
    summary: Optional[str] = None
    description: Optional[str] = None
    enabled: bool = True
    order: int = field(
        default=tmt.steps.PHASE_ORDER_DEFAULT,
        normalize=lambda key_address, raw_value, logger: (
            50 if raw_value is None else int(raw_value)
        ),
    )
    link: Optional[tmt.base.core.Links] = field(
        default=None,
        normalize=lambda key_address, raw_value, logger: tmt.base.core.Links(data=raw_value),
        # Using `to_spec()` on purpose: `Links` does not provide serialization
        # methods, because specification of links is already good enough. We
        # can use existing `to_spec()` method, and undo it with a simple
        # `Links(...)` call.
        serialize=lambda link: link.to_spec() if link else None,
        unserialize=lambda serialized_link: tmt.base.core.Links(data=serialized_link),
    )
    id: Optional[str] = None
    tag: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    tier: Optional[str] = field(
        default=None,
        normalize=lambda key_address, raw_value, logger: (
            None if raw_value is None else str(raw_value)
        ),
    )
    adjust: Optional[list[tmt.base.core._RawAdjustRule]] = field(
        default=None,
        normalize=lambda key_address, raw_value, logger: (
            []
            if raw_value is None
            else ([raw_value] if not isinstance(raw_value, list) else raw_value)
        ),
    )

    # Basic test information
    author: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    contact: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )
    component: list[str] = field(
        default_factory=list,
        normalize=tmt.utils.normalize_string_list,
    )

    # Test execution data
    path: Optional[str] = None
    framework: Optional[str] = None
    manual: bool = False
    tty: bool = False
    require: list[tmt.base.core.Dependency] = field(
        default_factory=list,
        normalize=tmt.base.core.normalize_require,
        serialize=lambda requires: [require.to_spec() for require in requires],
        unserialize=lambda serialized_requires: [
            tmt.base.core.dependency_factory(require) for require in serialized_requires
        ],
    )
    recommend: list[tmt.base.core.Dependency] = field(
        default_factory=list,
        normalize=tmt.base.core.normalize_require,
        serialize=lambda recommends: [recommend.to_spec() for recommend in recommends],
        unserialize=lambda serialized_recommends: [
            tmt.base.core.DependencySimple.from_spec(recommend)
            if isinstance(recommend, str)
            else tmt.base.core.DependencyFmfId.from_spec(recommend)
            for recommend in serialized_recommends
        ],
    )
    environment: tmt.utils.Environment = field(
        default_factory=tmt.utils.Environment,
        normalize=tmt.utils.Environment.normalize,
        serialize=lambda environment: environment.to_fmf_spec(),
        unserialize=lambda serialized: tmt.utils.Environment.from_fmf_spec(serialized),
        exporter=lambda environment: environment.to_fmf_spec(),
    )
    check: list[tmt.checks.Check] = field(
        default_factory=list,
        normalize=tmt.checks.normalize_test_checks,
        serialize=lambda checks: [check.to_spec() for check in checks],
        unserialize=lambda serialized: [
            tmt.checks.Check.from_spec(**check) for check in serialized
        ],
        exporter=lambda value: [check.to_minimal_spec() for check in value],
    )
    duration: str = '1h'
    result: str = 'respect'

    # ignore[override]: expected, we do want to accept more specific
    # type than the one declared in superclass.
    @classmethod
    def from_spec(  # type: ignore[override]
        cls, raw_data: dict[str, Any], logger: tmt.log.Logger
    ) -> Self:
        """
        Convert from a specification file or from a CLI option
        """

        data = cls(name=raw_data['name'], test=raw_data['test'])
        data._load_keys(raw_data, cls.__name__, logger)

        return data

    def to_spec(self) -> dict[str, Any]:
        """
        Convert to a form suitable for saving in a specification file
        """

        data = super().to_spec()
        data['link'] = self.link.to_spec() if self.link else None
        data['require'] = [require.to_spec() for require in self.require]
        data['recommend'] = [recommend.to_spec() for recommend in self.recommend]
        data['check'] = [check.to_spec() for check in self.check]
        data['test'] = str(self.test)

        return data

    def to_minimal_spec(self) -> dict[str, Any]:
        data = {key: value for key, value in self.items() if value not in (None, [], {})}

        # Some fields need special handling.
        # Map them to functions that will correctly convert them.
        field_map: dict[str, Callable[[Any], Any]] = {
            'link': lambda link: link.to_spec(),
            'require': lambda requires: [require.to_spec() for require in requires],
            'recommend': lambda recommends: [recommend.to_spec() for recommend in recommends],
            'check': lambda checks: [check.to_spec() for check in checks],
            'test': str,
        }

        for key, transform in field_map.items():
            value = getattr(self, option_to_key(key), None)
            if value is not None:
                value = transform(value)
            # Do not include empty values
            if value in (None, [], {}):
                data.pop(key, None)
            else:
                data[key] = value

        return data


@container
class DiscoverShellData(tmt.steps.discover.DiscoverStepData):
    tests: list[TestDescription] = field(
        default_factory=list,
        normalize=lambda key_address, raw_value, logger: [
            TestDescription.from_spec(raw_datum, logger)
            for raw_datum in cast(list[dict[str, Any]], raw_value)
        ],
        serialize=lambda tests: [test.to_serialized() for test in tests],
        unserialize=lambda serialized_tests: [
            TestDescription.from_serialized(serialized_test)
            for serialized_test in serialized_tests
        ],
    )

    keep_git_metadata: bool = field(
        option="--keep-git-metadata",
        is_flag=True,
        default=False,
        help="""
            By default the ``.git`` directory is removed to save disk space.
            Set to ``true`` to sync the git metadata to guest as well.
            Implicit if ``dist-git-source`` is used.
            """,
    )

    def to_spec(self) -> _RawDiscoverShellData:
        """
        Convert to a form suitable for saving in a specification file
        """

        return cast(
            _RawDiscoverShellData,
            {**super().to_spec(), 'tests': [test.to_spec() for test in self.tests]},
        )

    def to_minimal_spec(self) -> _RawDiscoverShellData:
        spec = {**super().to_minimal_spec()}
        spec.pop('tests', None)
        if self.tests:
            spec['tests'] = [test.to_minimal_spec() for test in self.tests]
        return cast(_RawDiscoverShellData, spec)


@tmt.steps.provides_method('shell')
class DiscoverShell(tmt.steps.discover.DiscoverPlugin[DiscoverShellData]):
    """
    Use provided list of shell script tests.

    List of test cases to be executed can be defined manually directly
    in the plan as a list of dictionaries containing test ``name`` and
    actual ``test`` script. It is also possible to define here any other
    test metadata such as the ``duration`` or a ``path`` to the test.
    The default duration for tests defined directly in the discover step
    is ``1h``.

    Example config:

    .. code-block:: yaml

        discover:
            how: shell
            tests:
              - name: /help/main
                test: tmt --help
              - name: /help/test
                test: tmt test --help
              - name: /help/smoke
                test: ./smoke.sh
                path: /tests/shell

    For DistGit repo one can download sources and use code from them in
    the tests. Sources are extracted into ``$TMT_SOURCE_DIR`` path,
    patches are applied by default. See options to install build
    dependencies or to just download sources without applying patches.
    To apply patches, special ``prepare`` phase with order ``60`` is
    added, and ``prepare`` step has to be enabled for it to run.

    .. code-block:: yaml

        discover:
            how: shell
            dist-git-source: true
            tests:
              - name: /upstream
                test: cd $TMT_SOURCE_DIR/*/tests && make test

    To clone a remote repository and use it as a source specify ``url``.
    It accepts also ``ref`` to checkout provided reference. Dynamic
    reference feature is supported as well.

    .. code-block:: yaml

        discover:
            how: shell
            url: https://github.com/teemtee/tmt.git
            ref: "1.18.0"
            tests:
              - name: first test
                test: ./script-from-the-repo.sh
    """

    _data_class = DiscoverShellData

    _tests: list[tmt.base.core.Test] = []

    def show(self, keys: Optional[list[str]] = None) -> None:
        """
        Show config details
        """

        super().show([])

        if self.data.tests:
            click.echo(tmt.utils.format('tests', [test.name for test in self.data.tests]))

    def checkout_ref(self) -> None:
        super().checkout_ref()

        keep_git_metadata = True if self.data.dist_git_source else self.data.keep_git_metadata
        if not keep_git_metadata:
            # Remove .git so that it's not copied to the SUT
            shutil.rmtree(self.test_dir / '.git', ignore_errors=True)

    def _fetch_local_repository(self) -> Optional[Path]:
        assert self.step.plan.worktree  # narrow type

        # Symlink tests directory to the plan work tree
        relative_path = self.step.plan.worktree.relative_to(self.phase_workdir)
        self.test_dir.symlink_to(relative_path)

        # Git metadata are necessary for dist_git_source
        keep_git_metadata = True if self.data.dist_git_source else self.data.keep_git_metadata
        if keep_git_metadata and self.step.plan.fmf_root:
            # Copy .git which is excluded when worktree is initialized
            # If exists, git_root can be only the same or parent of fmf_root
            git_root = tmt.utils.git.git_root(
                fmf_root=self.step.plan.fmf_root, logger=self._logger
            )
            if git_root:
                if git_root != self.step.plan.fmf_root:
                    raise tmt.utils.DiscoverError(
                        "The 'keep-git-metadata' option can be "
                        "used only when fmf root is the same as git root."
                    )

                with self.tmpdir(prefix='rsync-') as rsync_tempdir:
                    self.run(
                        Command(
                            "rsync",
                            "-ar",
                            '--temp-dir',
                            rsync_tempdir,
                            f"{git_root}/.git",
                            self.test_dir,
                        )
                    )
        return None

    def go(self, *, path: Optional[Path] = None, logger: Optional[tmt.log.Logger] = None) -> None:
        """
        Discover available tests
        """

        super().go(path=path, logger=logger)
        tests = fmf.Tree({'summary': 'tests'})

        self.log_import_plan_details()

        # Check and process each defined shell test
        for data in self.data.tests:
            # Create data copy (we want to keep original data for save()
            data = copy.deepcopy(data)
            # Extract name, make sure it is present
            # TODO: can this ever happen? With annotations, `name: str` and `test: str`, nothing
            # should ever assign `None` there and pass the test.
            if not data.name:
                raise tmt.utils.SpecificationError(
                    f"Missing test name in '{self.step.plan.name}'."
                )
            # Make sure that the test script is defined
            if not data.test:
                raise tmt.utils.SpecificationError(
                    f"Missing test script in '{self.step.plan.name}'."
                )
            # Apply default test duration unless provided
            if not data.duration:
                data.duration = tmt.base.core.DEFAULT_TEST_DURATION_L2

            # Create a simple fmf node, with correct name. Emit only keys and values
            # that are no longer default. Do not add `name` itself into the node,
            # it's not a supported test key, and it's given to the node itself anyway.
            # Note the exception for `duration` key - it's expected in the output
            # even if it still has its default value.
            test_fmf_keys: dict[str, Any] = {
                key: value
                for key, value in data.to_spec().items()
                if key != 'name' and (key == 'duration' or value != data.default(key))
            }
            tests.child(data.name, test_fmf_keys)

        if self.data.dist_git_source:
            assert self.step.plan.my_run is not None  # narrow type
            assert self.step.plan.my_run.tree is not None  # narrow type
            assert self.step.plan.my_run.tree.root is not None  # narrow type
            fmf_root = self.test_dir if self.data.url else self.step.plan.my_run.tree.root
            git_root = tmt.utils.git.git_root(
                fmf_root=fmf_root,
                logger=self._logger,
            )
            if not git_root:
                raise tmt.utils.DiscoverError(f"Directory '{fmf_root}' is not a git repository.")
            try:
                self.download_distgit_source(
                    distgit_dir=git_root,
                    target_dir=self.source_dir,
                    handler_name=self.data.dist_git_type,
                )
                # Copy rest of files so TMT_SOURCE_DIR has patches, sources and spec file
                # FIXME 'worktree' could be used as source_dir when 'url' is not set
                shutil.copytree(git_root, self.source_dir, symlinks=True, dirs_exist_ok=True)

                if self.data.dist_git_download_only:
                    self.debug("Do not extract sources as 'download_only' is set.")
                else:
                    # Check if prepare is enabled, warn user if not
                    if not self.step.plan.prepare.enabled:
                        self.warn("Sources will not be extracted, prepare step is not enabled.")
                    insert_to_prepare_step(
                        discover_plugin=self,
                        sourcedir=self.source_dir,
                    )

            except Exception as error:
                raise tmt.utils.DiscoverError("Failed to process 'dist-git-source'.") from error

        # Use a tmt.Tree to apply possible command line filters
        self._tests = tmt.Tree(logger=self._logger, tree=tests).tests(
            conditions=["manual is False"], sort=False
        )

    def tests(
        self, *, phase_name: Optional[str] = None, enabled: Optional[bool] = None
    ) -> list[tmt.steps.discover.TestOrigin]:
        if phase_name is not None and phase_name != self.name:
            return []

        if enabled is None:
            return [
                tmt.steps.discover.TestOrigin(test=test, phase=self.name) for test in self._tests
            ]

        return [
            tmt.steps.discover.TestOrigin(test=test, phase=self.name)
            for test in self._tests
            if test.enabled is enabled
        ]
