import re
from collections.abc import Iterable, Sequence
from typing import ClassVar, Optional, cast

from tmt._compat.pathlib import Path
from tmt.package_managers import (
    YUM_REPOS_DIR,
    FileSystemPath,
    Installable,
    Options,
    Package,
    PackageManager,
    PackageManagerEngine,
    Repository,
    Version,
    escape_installables,
    provides_package_manager,
)
from tmt.package_managers._rpm import RpmVersion
from tmt.utils import Command, CommandOutput, GeneralError, PrepareError, RunError, ShellScript


class DnfEngine(PackageManagerEngine):
    _base_command = Command('dnf')
    _base_debuginfo_command = Command('debuginfo-install')

    skip_missing_packages_option = '--skip-broken'
    skip_missing_debuginfo_option = skip_missing_packages_option

    def prepare_command(self) -> tuple[Command, Command]:
        options = Command('-y')

        command = self._base_command

        if self.guest.facts.sudo_prefix:
            command = Command(self.guest.facts.sudo_prefix) + self._base_command

        return (command, options)

    def _extra_dnf_options(self, options: Options, command: Optional[Command] = None) -> Command:
        """
        Collect additional options for ``yum``/``dnf`` based on given options
        """

        command = command or self._base_command

        extra_options = Command()

        for package in options.excluded_packages:
            extra_options += Command('--exclude', package)

        if options.skip_missing:
            if str(command) == str(self._base_command):
                extra_options += Command(self.skip_missing_packages_option)

            elif str(command) == str(self._base_debuginfo_command):
                extra_options += Command(self.skip_missing_debuginfo_option)

            else:
                raise GeneralError(f"Unhandled package manager command '{command}'.")

        return extra_options

    def _construct_presence_script(
        self, *installables: Installable, what_provides: bool = True
    ) -> ShellScript:
        if what_provides:
            return ShellScript(
                f'rpm -q --whatprovides {" ".join(escape_installables(*installables))}'
            )

        return ShellScript(f'rpm -q {" ".join(escape_installables(*installables))}')

    def check_presence(self, *installables: Installable) -> ShellScript:
        return self._construct_presence_script(*installables)

    def _construct_install_script(
        self, *installables: Installable, options: Optional[Options] = None
    ) -> ShellScript:
        options = options or Options()

        extra_options = self._extra_dnf_options(options)

        script = ShellScript(
            f'{self.command.to_script()} install '
            f'{self.options.to_script()} {extra_options} '
            f'{" ".join(escape_installables(*installables))}'
        )

        if options.check_first:
            script = self._construct_presence_script(*installables) | script

        return script

    def _construct_reinstall_script(
        self, *installables: Installable, options: Optional[Options] = None
    ) -> ShellScript:
        options = options or Options()

        extra_options = self._extra_dnf_options(options)

        script = ShellScript(
            f'{self.command.to_script()} reinstall '
            f'{self.options.to_script()} {extra_options} '
            f'{" ".join(escape_installables(*installables))}'
        )

        if options.check_first:
            script = self._construct_presence_script(*installables) & script

        return script

    def _construct_install_debuginfo_script(
        self, *installables: Installable, options: Optional[Options] = None
    ) -> ShellScript:
        options = options or Options()

        extra_options = self._extra_dnf_options(options, command=self._base_debuginfo_command)

        return ShellScript(
            f'{self._base_debuginfo_command.to_script()} '
            f'{self.options.to_script()} {extra_options} '
            f'{" ".join(escape_installables(*installables))}'
        )

    def refresh_metadata(self) -> ShellScript:
        return ShellScript(
            f'{self.command.to_script()} makecache {self.options.to_script()} --refresh'
        )

    def enable_repo(self, *repo_ids: str) -> ShellScript:
        return (self.command + Command('config-manager', '--enable', *repo_ids)).to_script()

    def disable_repo(self, *repo_ids: str) -> ShellScript:
        return (self.command + Command('config-manager', '--disable', *repo_ids)).to_script()

    def install(
        self,
        *installables: Installable,
        options: Optional[Options] = None,
    ) -> ShellScript:
        return self._construct_install_script(*installables, options=options)

    def reinstall(
        self,
        *installables: Installable,
        options: Optional[Options] = None,
    ) -> ShellScript:
        return self._construct_reinstall_script(*installables, options=options)

    def install_debuginfo(
        self,
        *installables: Installable,
        options: Optional[Options] = None,
    ) -> ShellScript:
        options = options or Options()

        # Make sure debuginfo-install is present on the target system
        if self._base_debuginfo_command == Command('debuginfo-install'):
            script = self.install(FileSystemPath('/usr/bin/debuginfo-install'))

            script &= cast(  # type: ignore[redundant-cast]
                ShellScript,
                self._construct_install_debuginfo_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                    *installables, options=options
                ),
            )

        else:
            script = cast(  # type: ignore[redundant-cast]
                ShellScript,
                self._construct_install_debuginfo_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                    *installables, options=options
                ),
            )

        # Extra ignore/check for yum to workaround BZ#1920176
        if not options.skip_missing:
            script &= cast(  # type: ignore[redundant-cast]
                ShellScript,
                self._construct_presence_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                    *tuple(Package(f'{installable}-debuginfo') for installable in installables),
                    what_provides=False,
                ),
            )

        return script

    def install_repository(self, repository: Repository) -> ShellScript:
        repo_path = YUM_REPOS_DIR / repository.filename
        return ShellScript(
            f"{self.guest.facts.sudo_prefix} tee {repo_path} <<'EOF'\n{repository.content}\nEOF"
        )

    def list_packages(self, repository: Repository) -> ShellScript:
        return (
            self.command
            + Command(
                'repoquery',
                '--disablerepo=*',
                *[f'--enablerepo={repo_id}' for repo_id in repository.repo_ids],
                '--queryformat',
                r'%{repoid};%{name}-%{epoch}:%{version}-%{release}.%{arch}\n',
            )
        ).to_script()

    def get_package_origin(self, packages: Iterable[str]) -> ShellScript:
        return (
            self.command
            + Command(
                'repoquery',
                '--installed',
                '--queryformat',
                r'%{name} %{from_repo}\n',
                *[Package(p) for p in packages],
            )
        ).to_script()

    def resolve_provides(
        self,
        provides: Sequence[str],
        repo_ids: Iterable[str] = (),
    ) -> ShellScript:
        assert provides, "provides must not be empty"
        provides_str = ' '.join(escape_installables(*[Package(p) for p in provides]))
        cmd = (
            self.command
            + Command(
                'repoquery',
                '--queryformat',
                r"- nevra: '%{full_nevra}'\n  repo_id: '%{repoid}'\n",
                *[f'--repo={repo_id}' for repo_id in repo_ids],
                '--whatprovides',
            )
        ).to_script()
        return ShellScript(f"""
        for _provide in {provides_str}; do
            echo "'$_provide':"
            {cmd} "$_provide"
        done
        """)

    def create_repository(self, directory: Path) -> ShellScript:
        """
        Create repository metadata for package files in the given directory.

        :param directory: The path to the directory containing RPM packages.
        :returns: A shell script to create repository metadata.
        """
        return ShellScript(f"createrepo {directory}")


@provides_package_manager('dnf')
class Dnf(PackageManager[DnfEngine]):
    NAME = 'dnf'

    _engine_class = DnfEngine

    #: Package name of the COPR plugin for this package manager.
    copr_plugin: ClassVar[str] = 'dnf-plugins-core'

    #: Package name of the config-manager plugin for this package manager.
    config_manager_plugin: ClassVar[str] = 'dnf-command(config-manager)'

    # Compiled regex patterns for DNF/YUM error messages
    _FAILED_PACKAGE_INSTALLATION_PATTERNS = [
        re.compile(r'Unable to find a match:\s+([^\s\n]+)', re.IGNORECASE),
        re.compile(r'No match for argument:\s+([^\s\n]+)', re.IGNORECASE),
        re.compile(r'No package\s+([^\s\n]+)\s+available', re.IGNORECASE),
        re.compile(r'Could not find a package for:\s*([^\s]+)', re.IGNORECASE),
    ]

    bootc_builder = True

    probe_command = ShellScript(
        """
        type dnf && ((dnf --version | grep -E 'dnf5 version') && exit 1 || exit 0)
        """
    ).to_shell_command()
    # The priority of preference: `rpm-ostree` > `dnf5` > `dnf` > `yum`.
    # `rpm-ostree` has its own implementation and its own priority, and
    # the `dnf` family just stays below it.
    probe_priority = 50

    def list_packages(self, repository: Repository) -> list[Version]:

        script = self.engine.list_packages(repository)
        output = self.guest.execute(script)
        stdout = output.stdout

        if stdout is None:
            raise GeneralError("Repository query provided no output")

        result: list[Version] = []
        for line in stdout.strip().splitlines():
            line = line.strip()
            if not line:
                continue
            repo_id, nevra = line.split(';', maxsplit=1)
            result.append(RpmVersion.from_nevra(nevra, repo_id=repo_id))
        return result

    def check_presence(self, *installables: Installable) -> dict[Installable, bool]:
        try:
            output = self.guest.execute(self.engine.check_presence(*installables))
            stdout = output.stdout

        except RunError as exc:
            stdout = exc.stdout

        if stdout is None:
            raise GeneralError("rpm presence check provided no output")

        results: dict[Installable, bool] = {}

        for line, installable in zip(stdout.strip().splitlines(), installables):
            # Match for packages not installed, when "rpm -q PACKAGE" used
            match = re.match(rf'package {re.escape(str(installable))} is not installed', line)
            if match is not None:
                results[installable] = False
                continue

            # Match for provided rpm capabilities (packages, commands, etc.),
            # when "rpm -q --whatprovides CAPABILITY" used
            match = re.match(rf'no package provides {re.escape(str(installable))}', line)
            if match is not None:
                results[installable] = False
                continue

            # Match for filesystem paths, when "rpm -q --whatprovides PATH" used
            match = re.match(
                rf'error: file {re.escape(str(installable))}: No such file or directory', line
            )
            if match is not None:
                results[installable] = False
                continue

            results[installable] = True

        return results

    def assert_config_manager(self) -> None:
        self.debug('Make sure the config-manager plugin is available.')
        self.install(Package(self.config_manager_plugin))

    def enable_copr(self, *repositories: str) -> None:
        """
        Enable requested copr repositories
        """

        if not repositories:
            return

        # Try to install copr plugin
        self.debug('Make sure the copr plugin is available.')
        self.install(Package(self.copr_plugin))
        for repository in repositories:
            self.info('copr', repository, 'green')
            self.guest.execute(
                ShellScript(
                    f"{self.engine.command.to_script()} copr "
                    f"{self.engine.options.to_script()} enable -y {repository}"
                )
            )

    def install_local(
        self,
        *installables: Installable,
        options: Optional[Options] = None,
    ) -> CommandOutput:

        options = options or Options()
        options.check_first = False
        # Use both install/reinstall to get all packages refreshed
        # FIXME Simplify this once BZ#1831022 is fixed/implemented.
        output = self.install(*installables, options=options)
        self.reinstall(*installables, options=options)
        return output

    def install_debuginfo(
        self,
        *installables: Installable,
        options: Optional[Options] = None,
    ) -> CommandOutput:

        output = super().install_debuginfo(*installables, options=options)

        # Check the packages are installed because 'debuginfo-install'
        # returns 0 even though it didn't manage to install the required packages
        if not (options and options.skip_missing):
            self.check_presence(*[Package(f'{p}-debuginfo') for p in installables])
        return output


class Dnf5Engine(DnfEngine):
    _base_command = Command('dnf5')
    _base_debuginfo_command = Command('dnf5', 'debuginfo-install')
    skip_missing_packages_option = '--skip-unavailable'
    skip_missing_debuginfo_option = skip_missing_packages_option


@provides_package_manager('dnf5')
class Dnf5(Dnf):
    NAME = 'dnf5'

    _engine_class = Dnf5Engine

    copr_plugin: ClassVar[str] = 'dnf5-command(copr)'
    config_manager_plugin: ClassVar[str] = 'dnf5-command(config-manager)'

    probe_command = Command('dnf5', '--version')
    probe_priority = 60


class YumEngine(DnfEngine):
    _base_command = Command('yum')

    def _yum_config_manager_command(self) -> Command:
        command = Command('yum-config-manager')

        if self.guest.facts.sudo_prefix:
            command = Command(self.guest.facts.sudo_prefix) + command

        return command

    def enable_repo(self, *repo_ids: str) -> ShellScript:
        return (self._yum_config_manager_command() + Command('--enable', *repo_ids)).to_script()

    def disable_repo(self, *repo_ids: str) -> ShellScript:
        return (self._yum_config_manager_command() + Command('--disable', *repo_ids)).to_script()

    def resolve_provides(
        self,
        provides: Sequence[str],
        repo_ids: Iterable[str] = (),
    ) -> ShellScript:
        raise PrepareError("Package manager 'yum' does not support provides resolution.")

    def get_package_origin(self, packages: Iterable[str]) -> ShellScript:
        # Real yum 3.x (not a dnf symlink) ships repoquery as a separate
        # yum-utils plugin and the %{from_repo} queryformat field is not
        # guaranteed to be available.  Support can be added once tested on
        # an actual yum 3.x system (RHEL 6 / CentOS 6 era).
        raise NotImplementedError

    # TODO: get rid of those `type: ignore` below. I think it's caused by the
    # decorator, it might be messing with the class inheritance as seen by pyright,
    # but mypy sees no issue, pytest sees no issue, everything works. Silencing
    # for now.
    def install(
        self, *installables: Installable, options: Optional[Options] = None
    ) -> ShellScript:
        options = options or Options()

        script = cast(  # type: ignore[redundant-cast]
            ShellScript,
            self._construct_install_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                *installables, options=options
            ),
        )

        # Extra ignore/check for yum to workaround BZ#1920176
        if options.skip_missing:
            script |= ShellScript('/bin/true')

        else:
            script &= cast(  # type: ignore[redundant-cast]
                ShellScript,
                self._construct_presence_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                    *installables
                ),
            )

        return script

    def reinstall(
        self, *installables: Installable, options: Optional[Options] = None
    ) -> ShellScript:
        options = options or Options()

        script = cast(  # type: ignore[redundant-cast]
            ShellScript,
            self._construct_reinstall_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                *installables, options=options
            ),
        )

        # Extra ignore/check for yum to workaround BZ#1920176
        if options.skip_missing:
            script |= ShellScript('/bin/true')

        else:
            script &= cast(  # type: ignore[redundant-cast]
                ShellScript,
                self._construct_presence_script(  # type: ignore[reportGeneralIssues,unused-ignore]
                    *installables
                ),
            )

        return script

    def refresh_metadata(self) -> ShellScript:
        return ShellScript(f'{self.command.to_script()} makecache')


@provides_package_manager('yum')
class Yum(Dnf):
    NAME = 'yum'

    _engine_class = YumEngine

    copr_plugin: ClassVar[str] = 'yum-plugin-copr'
    config_manager_plugin: ClassVar[str] = 'yum-utils'

    bootc_builder = False

    probe_command = ShellScript(
        """
        type yum && ((yum --version | grep -E 'dnf5 version') && exit 1 || exit 0)
        """
    ).to_shell_command()
    probe_priority = 40
