from __future__ import annotations

import threading
from collections import deque
from collections.abc import Callable
from typing import TYPE_CHECKING, NoReturn

import attrs

from .. import _core
from .._util import NoPublicConstructor, final
from ._wakeup_socketpair import WakeupSocketpair

if TYPE_CHECKING:
    from typing_extensions import TypeVarTuple, Unpack

    PosArgsT = TypeVarTuple("PosArgsT")

Function = Callable[..., object]  # type: ignore[explicit-any]
Job = tuple[Function, tuple[object, ...]]


@attrs.define
class EntryQueue:
    # This used to use a queue.Queue. but that was broken, because Queues are
    # implemented in Python, and not reentrant -- so it was thread-safe, but
    # not signal-safe. deque is implemented in C, so each operation is atomic
    # WRT threads (and this is guaranteed in the docs), AND each operation is
    # atomic WRT signal delivery (signal handlers can run on either side, but
    # not *during* a deque operation). dict makes similar guarantees - and
    # it's even ordered!
    queue: deque[Job] = attrs.Factory(deque)
    idempotent_queue: dict[Job, None] = attrs.Factory(dict)

    wakeup: WakeupSocketpair = attrs.Factory(WakeupSocketpair)
    done: bool = False
    # Must be a reentrant lock, because it's acquired from signal handlers.
    # RLock is signal-safe as of cpython 3.2. NB that this does mean that the
    # lock is effectively *disabled* when we enter from signal context. The
    # way we use the lock this is OK though, because when
    # run_sync_soon is called from a signal it's atomic WRT the
    # main thread -- it just might happen at some inconvenient place. But if
    # you look at the one place where the main thread holds the lock, it's
    # just to make 1 assignment, so that's atomic WRT a signal anyway.
    lock: threading.RLock = attrs.Factory(threading.RLock)

    async def task(self) -> None:
        assert _core.currently_ki_protected()
        # RLock has two implementations: a signal-safe version in _thread, and
        # and signal-UNsafe version in threading. We need the signal safe
        # version. Python 3.2 and later should always use this anyway, but,
        # since the symptoms if this goes wrong are just "weird rare
        # deadlocks", then let's make a little check.
        # See:
        #     https://bugs.python.org/issue13697#msg237140
        assert self.lock.__class__.__module__ == "_thread"

        def run_cb(job: Job) -> None:
            # We run this with KI protection enabled; it's the callback's
            # job to disable it if it wants it disabled. Exceptions are
            # treated like system task exceptions (i.e., converted into
            # TrioInternalError and cause everything to shut down).
            sync_fn, args = job
            try:
                sync_fn(*args)
            except BaseException as exc:

                async def kill_everything(  # noqa: RUF029  # await not used
                    exc: BaseException,
                ) -> NoReturn:
                    raise exc

                try:
                    _core.spawn_system_task(kill_everything, exc)
                except RuntimeError:
                    # We're quite late in the shutdown process and the
                    # system nursery is already closed.
                    # TODO(2020-06): this is a gross hack and should
                    # be fixed soon when we address #1607.
                    parent_nursery = _core.current_task().parent_nursery
                    if parent_nursery is None:
                        raise AssertionError(
                            "Internal error: `parent_nursery` should never be `None`",
                        ) from exc  # pragma: no cover
                    parent_nursery.start_soon(kill_everything, exc)

        # This has to be carefully written to be safe in the face of new items
        # being queued while we iterate, and to do a bounded amount of work on
        # each pass:
        def run_all_bounded() -> None:
            for _ in range(len(self.queue)):
                run_cb(self.queue.popleft())
            for job in list(self.idempotent_queue):
                del self.idempotent_queue[job]
                run_cb(job)

        try:
            while True:
                run_all_bounded()
                if not self.queue and not self.idempotent_queue:
                    await self.wakeup.wait_woken()
                else:
                    await _core.checkpoint()
        except _core.Cancelled:
            # Keep the work done with this lock held as minimal as possible,
            # because it doesn't protect us against concurrent signal delivery
            # (see the comment above). Notice that this code would still be
            # correct if written like:
            #   self.done = True
            #   with self.lock:
            #       pass
            # because all we want is to force run_sync_soon
            # to either be completely before or completely after the write to
            # done. That's why we don't need the lock to protect
            # against signal handlers.
            with self.lock:
                self.done = True
            # No more jobs will be submitted, so just clear out any residual
            # ones:
            run_all_bounded()
            assert not self.queue
            assert not self.idempotent_queue

    def close(self) -> None:
        self.wakeup.close()

    def size(self) -> int:
        return len(self.queue) + len(self.idempotent_queue)

    def run_sync_soon(
        self,
        sync_fn: Callable[[Unpack[PosArgsT]], object],
        *args: Unpack[PosArgsT],
        idempotent: bool = False,
    ) -> None:
        with self.lock:
            if self.done:
                raise _core.RunFinishedError("run() has exited")
            # We have to hold the lock all the way through here, because
            # otherwise the main thread might exit *while* we're doing these
            # calls, and then our queue item might not be processed, or the
            # wakeup call might trigger an OSError b/c the IO manager has
            # already been shut down.
            if idempotent:
                self.idempotent_queue[sync_fn, args] = None
            else:
                self.queue.append((sync_fn, args))
            self.wakeup.wakeup_thread_and_signal_safe()


@final
@attrs.define(eq=False)
class TrioToken(metaclass=NoPublicConstructor):
    """An opaque object representing a single call to :func:`trio.run`.

    It has no public constructor; instead, see :func:`current_trio_token`.

    This object has two uses:

    1. It lets you re-enter the Trio run loop from external threads or signal
       handlers. This is the low-level primitive that :func:`trio.to_thread`
       and `trio.from_thread` use to communicate with worker threads, that
       `trio.open_signal_receiver` uses to receive notifications about
       signals, and so forth.

    2. Each call to :func:`trio.run` has exactly one associated
       :class:`TrioToken` object, so you can use it to identify a particular
       call.

    """

    _reentry_queue: EntryQueue

    def run_sync_soon(
        self,
        sync_fn: Callable[[Unpack[PosArgsT]], object],
        *args: Unpack[PosArgsT],
        idempotent: bool = False,
    ) -> None:
        """Schedule a call to ``sync_fn(*args)`` to occur in the context of a
        Trio task.

        This is safe to call from the main thread, from other threads, and
        from signal handlers. This is the fundamental primitive used to
        re-enter the Trio run loop from outside of it.

        The call will happen "soon", but there's no guarantee about exactly
        when, and no mechanism provided for finding out when it's happened.
        If you need this, you'll have to build your own.

        The call is effectively run as part of a system task (see
        :func:`~trio.lowlevel.spawn_system_task`). In particular this means
        that:

        * :exc:`KeyboardInterrupt` protection is *enabled* by default; if
          you want ``sync_fn`` to be interruptible by control-C, then you
          need to use :func:`~trio.lowlevel.disable_ki_protection`
          explicitly.

        * If ``sync_fn`` raises an exception, then it's converted into a
          :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You
          should be careful that ``sync_fn`` doesn't crash.

        All calls with ``idempotent=False`` are processed in strict
        first-in first-out order.

        If ``idempotent=True``, then ``sync_fn`` and ``args`` must be
        hashable, and Trio will make a best-effort attempt to discard any
        call submission which is equal to an already-pending call. Trio
        will process these in first-in first-out order.

        Any ordering guarantees apply separately to ``idempotent=False``
        and ``idempotent=True`` calls; there's no rule for how calls in the
        different categories are ordered with respect to each other.

        :raises trio.RunFinishedError:
              if the associated call to :func:`trio.run`
              has already exited. (Any call that *doesn't* raise this error
              is guaranteed to be fully processed before :func:`trio.run`
              exits.)

        """
        self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)
