import os
from shlex import quote
from typing import Any, ClassVar, Optional, Union, cast

import tmt
import tmt.guest
import tmt.log
import tmt.steps
import tmt.steps.provision
import tmt.utils
from tmt.container import container, field
from tmt.guest import (
    DEFAULT_PUSH_OPTIONS,
    GuestCapability,
    RebootMode,
    TransferOptions,
)
from tmt.steps.provision import Provision
from tmt.utils import (
    Command,
    OnProcessEndCallback,
    OnProcessStartCallback,
    Path,
    ShellScript,
    retry,
)
from tmt.utils.hints import get_hint
from tmt.utils.wait import Deadline, Waiting

# Timeout in seconds of waiting for a connection
CONNECTION_TIMEOUT = 60

# Defaults
DEFAULT_IMAGE = "fedora"
DEFAULT_USER = "root"
DEFAULT_PULL_ATTEMPTS = 5
DEFAULT_PULL_INTERVAL = 5
# podman default stop time is 10s
DEFAULT_STOP_TIME = 1


@container
class PodmanGuestData(tmt.guest.GuestData):
    image: str = field(
        default=DEFAULT_IMAGE,
        option=('-i', '--image'),
        metavar='IMAGE',
        help='Select image to use. Short name or complete url.',
    )
    # Override parent class with our defaults
    user: str = field(
        default=DEFAULT_USER,
        option=('-u', '--user'),
        metavar='USERNAME',
        help="""
             Username to pass to ``podman run`` command when starting the
             container representing the guest. It will override ``USER``
             directive from the image. Unless ``become`` is used, tests
             and other user-provided commands will be invoked under this
             user account.
             """,
    )
    force_pull: bool = field(
        default=False,
        option=('-p', '--pull', '--force-pull'),
        is_flag=True,
        help='Force pulling a fresh container image.',
    )

    container: Optional[str] = field(
        default=None,
        option=('-c', '--container'),
        metavar='NAME',
        help='Name or id of an existing container to be used.',
    )
    network: Optional[str] = field(
        default=None,
        internal=True,
    )

    network_prefix: Optional[str] = field(
        default=None,
        option='--network-prefix',
        metavar='PREFIX',
        help="""
             Custom prefix for container network names to avoid collisions
             between multiple simultaneous tmt invocations.
             """,
    )

    pull_attempts: int = field(
        default=DEFAULT_PULL_ATTEMPTS,
        option='--pull-attempts',
        metavar='COUNT',
        help=f"""
             How many times to try pulling the image,
             {DEFAULT_PULL_ATTEMPTS} attempts by default.
             """,
        normalize=tmt.utils.normalize_int,
    )

    pull_interval: int = field(
        default=DEFAULT_PULL_INTERVAL,
        option='--pull-interval',
        metavar='SECONDS',
        help=f"""
             How long to wait before a new pull attempt,
             {DEFAULT_PULL_INTERVAL} seconds by default.
             """,
        normalize=tmt.utils.normalize_int,
    )

    stop_time: int = field(
        default=DEFAULT_STOP_TIME,
        option='--stop-time',
        metavar='SECONDS',
        help=f"""
             How long to wait before forcibly stopping the container,
             {DEFAULT_STOP_TIME} seconds by default.
             """,
        normalize=tmt.utils.normalize_int,
    )

    expose_device: list[str] = field(
        default_factory=list,
        option='--expose-device',
        multiple=True,
        metavar='DEVICE',
        help="""
             Device to expose to the container (e.g., ``/dev/kvm``, ``/dev/ttyS3``).
             Can be specified multiple times.
             """,
        normalize=tmt.utils.normalize_string_list,
    )


@container
class ProvisionPodmanData(PodmanGuestData, tmt.steps.provision.ProvisionStepData):
    pass


class GuestContainer(tmt.Guest):
    """
    Container Instance
    """

    _data_class = PodmanGuestData
    NETWORK_NAME_FORMAT: ClassVar[str] = "{prefix}tmt-{run_name}-{plan_name}-network"

    image: Optional[str]
    container: Optional[str]
    user: str
    force_pull: bool
    parent: tmt.steps.Step
    pull_attempts: int
    pull_interval: int
    stop_time: int
    network_prefix: Optional[str]
    expose_device: list[str]
    logger: tmt.log.Logger

    @property
    def is_ready(self) -> bool:
        """
        Detect the guest is ready or not
        """

        # Check the container is running or not
        if self.container is None:
            return False
        cmd_output = self.podman(
            Command('container', 'inspect', '--format', '{{json .State.Running}}', self.container)
        )
        return str(cmd_output.stdout).strip() == 'true'

    def wake(self) -> None:
        """
        Wake up the guest
        """

        self.debug(f"Waking up container '{self.container}'.", level=2, shift=0)

    def pull_image(self) -> None:
        """
        Pull image if not available or pull forced
        """

        assert self.image is not None  # narrow type

        self.podman(
            Command('pull', '-q', self.image),
            message=f"Pull image '{self.image}'.",
        )

    def _setup_network(self) -> list[str]:
        """
        Set up the desired network.
        Creates a unique network name based on the run ID and plan name,
        and creates that network if it doesn't exist.
        Returns the network arguments to be used in podman run command.

        All container guests in a single plan will share the same network,
        to allow communication between them.
        """

        # Use provision-level network name to allow communication between containers
        # while avoiding collisions across different test runs
        assert isinstance(self.parent, Provision)  # narrow type

        # Use run_id and plan's safe name to ensure uniqueness across multiple plans
        # running simultaneously while maintaining good debugging information
        # Include custom prefix if provided for additional collision avoidance
        self.network = self.NETWORK_NAME_FORMAT.format(
            prefix=self.network_prefix or '',
            run_name=self.parent.run_workdir.name,
            plan_name=self.parent.plan.pathless_safe_name,
        )

        try:
            self.podman(
                Command('network', 'create', self.network),
                message=f"Create network '{self.network}'.",
            )
        except tmt.utils.RunError as err:
            if err.stderr and 'network already exists' in err.stderr:
                # error string:
                # https://github.com/containers/common/blob/main/libnetwork/types/define.go#L19
                self.debug(f"Network '{self.network}' already exists.", level=3)
            else:
                raise err

        return ['--network', self.network]

    def _setup_environment(self) -> list[str]:
        """
        Set up environment variables for the container.

        Writes plan environment variables to a file and returns
        the arguments to pass them to ``podman run`` via ``--env-file``.

        The file uses a simple ``KEY=VALUE`` format, one variable per line.
        This is the format expected by podman's ``--env-file`` option.

        .. note::

            This is NOT the standard dotenv format. Podman's env-file parser
            does not support quoted values or multiline values. Values are
            taken literally after the first ``=`` character. See
            https://github.com/containers/podman/blob/main/cmd/podman/parse/net.go
        """

        assert isinstance(self.parent, Provision)  # narrow type

        environment = self.parent.plan.environment

        if not environment:
            return []

        # Filter out variables with newlines - podman's env-file format
        # does not support multiline values (one line = one variable)
        filtered_env = tmt.utils.Environment()
        for key, value in environment.items():
            if '\n' in value:
                self.warn(
                    f"Environment variable '{key}' contains a newline character. "
                    "Podman's env-file format does not support multiline values, "
                    "skipping this variable."
                )
            else:
                filtered_env[key] = value

        if not filtered_env:
            return []

        # Write environment to a file in the guest workdir
        # Format: KEY=VALUE per line, no quoting (podman takes values literally)
        env_file = self.guest_workdir / 'podman-run-environment'
        env_file.write_text('\n'.join(f'{k}={v}' for k, v in filtered_env.items()))

        self.debug(f"Podman run environment file written to '{env_file}'.")

        return ['--env-file', str(env_file)]

    def start(self) -> None:
        """
        Start provisioned guest
        """

        if self.is_dry_run:
            return

        if self.container:
            self.primary_address = self.topology_address = self.container

            self.verbose('primary address', self.primary_address, 'green')
            self.verbose('topology address', self.topology_address, 'green')

            return

        self.container = self.primary_address = self.topology_address = self._tmt_name()
        self.verbose('primary address', self.primary_address, 'green')
        self.verbose('topology address', self.topology_address, 'green')

        # Check if the image is available
        assert self.image is not None

        try:
            self.podman(
                Command('image', 'exists', self.image),
                message=f"Check for container image '{self.image}'.",
            )
            needs_pull = False
        except tmt.utils.RunError:
            needs_pull = True

        # Retry pulling the image in case of network issues
        # Temporary solution until configurable in podman itself
        if needs_pull or self.force_pull:
            retry(
                self.pull_image,
                self.pull_attempts,
                self.pull_interval,
                f"Pulling '{self.image}' image",
                self._logger,
            )

        self.verbose('name', self.container, 'green')

        additional_args = []

        additional_args.extend(self._setup_network())
        additional_args.extend(self._setup_environment())

        # Add device access if requested
        for device in self.expose_device:
            # Device has already been validated in ProvisionPodman.go()
            additional_args.extend(['--device', device])

        # Run the container
        self.debug(f"Start container '{self.image}'.")
        assert self.container is not None
        self.podman(
            Command(
                'run',
                *additional_args,
                '--name',
                self.container,
                '-v',
                # Mount the whole plan directory in the container
                f'{self.plan_workdir}:{self.plan_workdir}:z',
                '-itd',
                '--user',
                self.user,
                self.image,
            )
        )

        self.assert_reachable()

    def reboot(
        self,
        mode: RebootMode = RebootMode.SOFT,
        command: Optional[Union[Command, ShellScript]] = None,
        waiting: Optional[Waiting] = None,
    ) -> bool:
        """
        Reboot the guest, and wait for the guest to recover.

        .. note::

           However, only :py:attr:`RebootMode.HARD` mode is supported by
           the plugin, other modes or a custom reboot command will result
           in an exception.

        :param mode: which boot mode to perform.
        :param command: a command to run on the guest to trigger the
            reboot. Only usable when mode is not
            :py:attr:`RebootMode.HARD`.
        :param waiting: deadline for the reboot.
        :returns: ``True`` if the reboot succeeded, ``False`` otherwise.
        """

        if mode == RebootMode.HARD:
            if self.container is None:
                raise tmt.utils.ProvisionError("No container initialized.")

            if waiting is None:
                waiting = tmt.guest.default_reconnect_waiting()
                waiting.deadline = Deadline.from_seconds(CONNECTION_TIMEOUT)

            self.debug("Hard reboot using the reboot command 'container restart'.")

            self.podman(Command('container', 'restart', self.container))

            return self.reconnect(waiting)

        if command:
            raise tmt.utils.ProvisionError(
                "Custom reboot command not supported in podman provision."
            )

        raise tmt.guest.RebootModeNotSupportedError(
            f"Guest '{self.multihost_name}' does not support {mode.value} reboot."
            " Containers can only be stopped and started again (hard reboot).",
            guest=self,
            mode=mode,
        )

    def _run_ansible(
        self,
        playbook: tmt.guest.AnsibleApplicable,
        playbook_root: Optional[Path] = None,
        extra_args: Optional[str] = None,
        friendly_command: Optional[str] = None,
        log: Optional[tmt.log.LoggingFunction] = None,
        silent: bool = False,
    ) -> tmt.utils.CommandOutput:
        """
        Run an Ansible playbook on the guest.

        This is a main workhorse for :py:meth:`ansible`. It shall run the
        playbook in whatever way is fitting for the guest and infrastructure.

        :param playbook: path to the playbook to run.
        :param playbook_root: if set, ``playbook`` path must be located
            under the given root path.
        :param extra_args: additional arguments to be passed to ``ansible-playbook``
            via ``--extra-args``.
        :param friendly_command: if set, it would be logged instead of the
            command itself, to improve visibility of the command in logging output.
        :param log: a logging function to use for logging of command output. By
            default, ``logger.debug`` is used.
        :param silent: if set, logging of steps taken by this function would be
            reduced.
        """

        playbook = self._sanitize_ansible_playbook_path(playbook, playbook_root)

        # As non-root we must run with podman unshare
        podman_command = Command()

        if os.geteuid() != 0:
            podman_command += ['podman', 'unshare']

        # TODO: add ansible inventory support
        podman_command += cast(
            tmt.utils.RawCommand,
            [
                'ansible-playbook',
                *self._ansible_verbosity(),
                *self._ansible_extra_args(extra_args),
                '-c',
                'podman',
                '-i',
                f'{self.container},',
                playbook,
            ],
        )

        try:
            return self._run_guest_command(
                podman_command,
                cwd=self.parent.plan.worktree,
                env=self._prepare_command_environment(),
                friendly_command=friendly_command,
                log=log,
                silent=silent,
            )
        except tmt.utils.RunError as exc:
            hint = get_hint('ansible-not-available', ignore_missing=False)

            if hint.search_cli_patterns(exc.stderr, exc.stdout, exc.message):
                hint.print(self._logger)

            raise exc

    def podman(
        self,
        command: Command,
        silent: bool = True,
        **kwargs: Any,
    ) -> tmt.utils.CommandOutput:
        """
        Run given command via podman
        """

        try:
            return self._run_guest_command(Command('podman') + command, silent=silent, **kwargs)
        except tmt.utils.RunError as error:
            if (
                "File 'podman' not found." in error.message
                or "File 'ansible-playbook' not found." in error.message
            ):
                raise tmt.utils.ProvisionError(
                    "Install 'tmt+provision-container' to provision using this method."
                ) from error
            raise error

    def execute(
        self,
        command: Union[tmt.utils.Command, tmt.utils.ShellScript],
        cwd: Optional[Path] = None,
        env: Optional[tmt.utils.Environment] = None,
        friendly_command: Optional[str] = None,
        test_session: bool = False,
        immediately: bool = True,
        tty: bool = False,
        silent: bool = False,
        log: Optional[tmt.log.LoggingFunction] = None,
        interactive: bool = False,
        on_process_start: Optional[OnProcessStartCallback] = None,
        on_process_end: Optional[OnProcessEndCallback] = None,
        sourced_files: Optional[list[Path]] = None,
        **kwargs: Any,
    ) -> tmt.utils.CommandOutput:
        """
        Execute given commands in podman via shell
        """

        sourced_files = sourced_files or []

        if not self.container and not self.is_dry_run:
            raise tmt.utils.ProvisionError('Could not execute without provisioned container.')

        podman_command = Command('exec')

        # Accumulate all necessary commands - they will form a "shell" script, a single
        # string passed to a shell executed inside the container.
        script = ShellScript.from_scripts(
            self._prepare_command_environment(env).to_shell_exports()
        )

        # Change to given directory on guest if cwd provided
        if cwd is not None:
            script += ShellScript(f'cd {quote(str(cwd))}')

        for file in sourced_files:
            script += ShellScript(f'source {quote(str(file))}')

        if isinstance(command, Command):
            script += command.to_script()

        else:
            script += command

        # Force the use of a pseudo-terminal if requested or when running
        # a test. Without it, processes spawned by the test session would
        # keep running after `podman exec` completes, e.g. in the case of
        # a timeout.
        if test_session or tty or interactive:
            podman_command += ['-t']

        # Run in interactive mode if requested
        if interactive:
            podman_command += ['-i']

        podman_command += [
            self.container or 'dry',
        ]

        podman_command += script.to_shell_command()

        # Note that we MUST run commands via bash, so variables
        # work as expected
        return self.podman(
            podman_command,
            log=log or self._command_verbose_logger,
            friendly_command=friendly_command or str(command),
            silent=silent,
            interactive=interactive,
            on_process_start=on_process_start,
            on_process_end=on_process_end,
            **kwargs,
        )

    def push(
        self,
        source: Optional[Path] = None,
        destination: Optional[Path] = None,
        options: Optional[TransferOptions] = None,
        superuser: bool = False,
    ) -> None:
        """
        Make sure that the workdir has a correct selinux context
        """

        if not self.is_ready:
            return

        options = options or DEFAULT_PUSH_OPTIONS

        assert self.parent.plan.my_run is not None  # narrow type

        # Relabel workdir to container_file_t if SELinux supported
        self.debug("Update selinux context of the run workdir.", level=3)

        if self.parent.plan.my_run.runner.facts.has_selinux:
            self._run_guest_command(
                Command("chcon", "--recursive", "--type=container_file_t", self.plan_workdir),
                shell=False,
                silent=True,
            )

        # Make sure we do not copy to a subfolder (/foo/bar/bar), see `man podman-cp`
        path_suffix = "/." if options.recursive else ""

        # In case explicit destination is given, use `podman cp` to copy data
        # to the container. If running in toolbox, make sure to copy from the toolbox
        # container instead of localhost.
        if source and destination:
            container_name: Optional[str] = None
            if self.parent.plan.my_run.runner.facts.is_toolbox:
                container_name = self.parent.plan.my_run.runner.facts.toolbox_container_name
            self.podman(
                Command(
                    "cp",
                    f"{container_name}:{source}{path_suffix}"
                    if container_name
                    else f"{source}{path_suffix}",
                    f"{self.container}:{destination}",
                )
            )

    def pull(
        self,
        source: Optional[Path] = None,
        destination: Optional[Path] = None,
        options: Optional[TransferOptions] = None,
    ) -> None:
        """
        Nothing to be done to pull workdir
        """

        if not self.is_ready:
            return

    def stop(self) -> None:
        """
        Stop provisioned guest
        """

        if self.container:
            self.podman(
                Command('container', 'stop', '--time', str(self.stop_time), self.container)
            )
            self.info('container', 'stopped', 'green')

    def remove(self) -> None:
        """
        Remove the container
        """

        if self.container:
            self.podman(Command('container', 'rm', '-f', self.container))
            self.info('container', 'removed', 'green')

        if self.network:
            # Will remove the network if there are no more containers attached to it.
            try:
                self.podman(
                    Command('network', 'rm', self.network),
                    message=f"Remove network '{self.network}'.",
                )
                self.info('container', 'network removed', 'green')
            except tmt.utils.RunError as err:
                if err.stderr and 'network is being used' in err.stderr:
                    # error string:
                    # https://github.com/containers/podman/blob/main/libpod/define/errors.go#L180
                    self.debug(f"Network '{self.network}' is being used, not removing.", level=3)
                elif err.stderr and 'network not found' in err.stderr:
                    self.debug(f"Network '{self.network}' has been already removed.", level=3)
                else:
                    raise err


@tmt.steps.provides_method(
    'container',
    installation_hint="""
        Make sure ``podman`` is installed and configured, it is required for container-backed
        guests provided by ``provision/container`` plugin.

        To quickly test ``podman`` functionality, you can try running ``podman images`` or
        ``podman run --rm -it fedora:latest``.

        * Users who installed tmt from system repositories should install
          ``tmt+provision-container`` package.
        * Users who installed tmt from PyPI should also install ``tmt+provision-container``
          package, as it will install required system dependencies. After doing so, they should
          install ``tmt[provision-container]`` extra.
    """,
)
class ProvisionPodman(tmt.steps.provision.ProvisionPlugin[ProvisionPodmanData]):
    """
    Create a new container using ``podman``.

    Example config:

    .. code-block:: yaml

        provision:
            how: container
            image: fedora:latest

    .. code-block:: yaml

        # Use an image with a non-root user with sudo privileges,
        # and run scripts with sudo.
        provision:
            how: container
            image: image with non-root user with sudo privileges
            user: tester
            become: true

    .. code-block:: yaml

        # Expose devices to the container (single or multiple)
        provision:
            how: container
            image: fedora:latest
            expose-device:
                - /dev/kvm
                - /dev/ttyS3

    In order to always pull the fresh container image use ``pull: true``.

    In order to run the container with different user as the default ``root``,
    use ``user: USER``.

    Use ``expose-device`` to expose host devices to the container. This is useful
    for cases like KVM acceleration (``expose-device: /dev/kvm``) or accessing
    serial devices. Multiple devices can be specified as a list.

    For security reasons, exposable devices need to be explicitly allowed by tmt
    runner, either via ``--exposable-runner-devices`` CLI option or the
    ``TMT_EXPOSABLE_RUNNER_DEVICES`` environment variable.

    Container-backed guests do not support soft reboots or custom reboot
    commands. Soft reboot or ``tmt-reboot -c ...`` will result in an
    error.
    """

    _data_class = ProvisionPodmanData
    _guest_class = GuestContainer

    _thread_safe = True

    # Guest instance
    _guest = None

    def default(self, option: str, default: Any = None) -> Any:
        """
        Return default data for given option
        """

        if option == 'pull':
            return self.data.force_pull

        return super().default(option, default=default)

    def _validate_device_against_allowlist(self, device: str) -> None:
        """
        Validate a requested device path against the configured security allowlist.

        Raises a SpecificationError if the device does not match any of the
        patterns allowed for exposable devices.
        """
        # Check if device matches any of the allowed patterns
        if not any(pattern.match(device) for pattern in self.exposable_runner_device_patterns):
            raise tmt.utils.SpecificationError(
                f"Device '{device}' cannot be exposed. The device is not in the security "
                "allowlist. Please configure the '--exposable-runner-devices' option or the "
                "'TMT_EXPOSABLE_RUNNER_DEVICES' environment variable."
            )

    def go(self, *, logger: Optional[tmt.log.Logger] = None) -> None:
        """
        Provision the container
        """

        super().go(logger=logger)

        # Prepare data for the guest instance
        data = PodmanGuestData.from_plugin(self)

        data.show(verbose=self.verbosity_level, logger=self._logger)

        if data.hardware and data.hardware.constraint:
            self.warn("The 'container' provision plugin does not support hardware requirements.")

        # Validate device access configuration before creating guest
        for device in data.expose_device:
            self._validate_device_against_allowlist(device)

        # Create a new GuestTestcloud instance and start it
        self._guest = GuestContainer(
            logger=self._logger,
            data=data,
            name=self.name,
            parent=self.step,
        )
        self._guest.start()
        self._guest.setup()

        # TODO: this might be allowed with `--privileged`...
        self._guest.facts.capabilities[GuestCapability.SYSLOG_ACTION_READ_ALL] = False
        # ... while this seems to be forbidden completely.
        self._guest.facts.capabilities[GuestCapability.SYSLOG_ACTION_READ_CLEAR] = False
