"""These are the only functions that ever yield back to the task runner."""

from __future__ import annotations

import enum
import types

# Jedi gets mad in test_static_tool_sees_class_members if we use collections Callable
from typing import TYPE_CHECKING, Any, Callable, NoReturn, Union, cast

import attrs
import outcome

from . import _run

if TYPE_CHECKING:
    from collections.abc import Awaitable, Generator

    from typing_extensions import TypeAlias

    from ._run import Task

RaiseCancelT: TypeAlias = Callable[[], NoReturn]


# This class object is used as a singleton.
# Not exported in the trio._core namespace, but imported directly by _run.
class CancelShieldedCheckpoint:
    __slots__ = ()


# Not exported in the trio._core namespace, but imported directly by _run.
@attrs.frozen(slots=False)
class WaitTaskRescheduled:
    abort_func: Callable[[RaiseCancelT], Abort]


# Not exported in the trio._core namespace, but imported directly by _run.
@attrs.frozen(slots=False)
class PermanentlyDetachCoroutineObject:
    final_outcome: outcome.Outcome[object]


MessageType: TypeAlias = Union[
    type[CancelShieldedCheckpoint],
    WaitTaskRescheduled,
    PermanentlyDetachCoroutineObject,
    object,
]


# Helper for the bottommost 'yield'. You can't use 'yield' inside an async
# function, but you can inside a generator, and if you decorate your generator
# with @types.coroutine, then it's even awaitable. However, it's still not a
# real async function: in particular, it isn't recognized by
# inspect.iscoroutinefunction, and it doesn't trigger the unawaited coroutine
# tracking machinery. Since our traps are public APIs, we make them real async
# functions, and then this helper takes care of the actual yield:
@types.coroutine
def _real_async_yield(
    obj: MessageType,
) -> Generator[MessageType, None, None]:
    return (yield obj)


# Real yield value is from trio's main loop, but type checkers can't
# understand that, so we cast it to make type checkers understand.
_async_yield = cast(
    "Callable[[MessageType], Awaitable[outcome.Outcome[object]]]",
    _real_async_yield,
)


async def cancel_shielded_checkpoint() -> None:
    """Introduce a schedule point, but not a cancel point.

    This is *not* a :ref:`checkpoint <checkpoints>`, but it is half of a
    checkpoint, and when combined with :func:`checkpoint_if_cancelled` it can
    make a full checkpoint.

    Equivalent to (but potentially more efficient than)::

        with trio.CancelScope(shield=True):
            await trio.lowlevel.checkpoint()

    """
    (await _async_yield(CancelShieldedCheckpoint)).unwrap()


# Return values for abort functions
class Abort(enum.Enum):
    """:class:`enum.Enum` used as the return value from abort functions.

    See :func:`wait_task_rescheduled` for details.

    .. data:: SUCCEEDED
              FAILED

    """

    SUCCEEDED = 1
    FAILED = 2


# Should always return the type a Task "expects", unless you willfully reschedule it
# with a bad value.
async def wait_task_rescheduled(  # type: ignore[explicit-any]
    abort_func: Callable[[RaiseCancelT], Abort],
) -> Any:
    """Put the current task to sleep, with cancellation support.

    This is the lowest-level API for blocking in Trio. Every time a
    :class:`~trio.lowlevel.Task` blocks, it does so by calling this function
    (usually indirectly via some higher-level API).

    This is a tricky interface with no guard rails. If you can use
    :class:`ParkingLot` or the built-in I/O wait functions instead, then you
    should.

    Generally the way it works is that before calling this function, you make
    arrangements for "someone" to call :func:`reschedule` on the current task
    at some later point.

    Then you call :func:`wait_task_rescheduled`, passing in ``abort_func``, an
    "abort callback".

    (Terminology: in Trio, "aborting" is the process of attempting to
    interrupt a blocked task to deliver a cancellation.)

    There are two possibilities for what happens next:

    1. "Someone" calls :func:`reschedule` on the current task, and
       :func:`wait_task_rescheduled` returns or raises whatever value or error
       was passed to :func:`reschedule`.

    2. The call's context transitions to a cancelled state (e.g. due to a
       timeout expiring). When this happens, the ``abort_func`` is called. Its
       interface looks like::

           def abort_func(raise_cancel):
               ...
               return trio.lowlevel.Abort.SUCCEEDED  # or FAILED

       It should attempt to clean up any state associated with this call, and
       in particular, arrange that :func:`reschedule` will *not* be called
       later. If (and only if!) it is successful, then it should return
       :data:`Abort.SUCCEEDED`, in which case the task will automatically be
       rescheduled with an appropriate :exc:`~trio.Cancelled` error.

       Otherwise, it should return :data:`Abort.FAILED`. This means that the
       task can't be cancelled at this time, and still has to make sure that
       "someone" eventually calls :func:`reschedule`.

       At that point there are again two possibilities. You can simply ignore
       the cancellation altogether: wait for the operation to complete and
       then reschedule and continue as normal. (For example, this is what
       :func:`trio.to_thread.run_sync` does if cancellation is disabled.)
       The other possibility is that the ``abort_func`` does succeed in
       cancelling the operation, but for some reason isn't able to report that
       right away. (Example: on Windows, it's possible to request that an
       async ("overlapped") I/O operation be cancelled, but this request is
       *also* asynchronous – you don't find out until later whether the
       operation was actually cancelled or not.)  To report a delayed
       cancellation, then you should reschedule the task yourself, and call
       the ``raise_cancel`` callback passed to ``abort_func`` to raise a
       :exc:`~trio.Cancelled` (or possibly :exc:`KeyboardInterrupt`) exception
       into this task. Either of the approaches sketched below can work::

          # Option 1:
          # Catch the exception from raise_cancel and inject it into the task.
          # (This is what Trio does automatically for you if you return
          # Abort.SUCCEEDED.)
          trio.lowlevel.reschedule(task, outcome.capture(raise_cancel))

          # Option 2:
          # wait to be woken by "someone", and then decide whether to raise
          # the error from inside the task.
          outer_raise_cancel = None
          def abort(inner_raise_cancel):
              nonlocal outer_raise_cancel
              outer_raise_cancel = inner_raise_cancel
              TRY_TO_CANCEL_OPERATION()
              return trio.lowlevel.Abort.FAILED
          await wait_task_rescheduled(abort)
          if OPERATION_WAS_SUCCESSFULLY_CANCELLED:
              # raises the error
              outer_raise_cancel()

       In any case it's guaranteed that we only call the ``abort_func`` at most
       once per call to :func:`wait_task_rescheduled`.

    Sometimes, it's useful to be able to share some mutable sleep-related data
    between the sleeping task, the abort function, and the waking task. You
    can use the sleeping task's :data:`~Task.custom_sleep_data` attribute to
    store this data, and Trio won't touch it, except to make sure that it gets
    cleared when the task is rescheduled.

    .. warning::

       If your ``abort_func`` raises an error, or returns any value other than
       :data:`Abort.SUCCEEDED` or :data:`Abort.FAILED`, then Trio will crash
       violently. Be careful! Similarly, it is entirely possible to deadlock a
       Trio program by failing to reschedule a blocked task, or cause havoc by
       calling :func:`reschedule` too many times. Remember what we said up
       above about how you should use a higher-level API if at all possible?

    """
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()


async def permanently_detach_coroutine_object(
    final_outcome: outcome.Outcome[object],
) -> object:
    """Permanently detach the current task from the Trio scheduler.

    Normally, a Trio task doesn't exit until its coroutine object exits. When
    you call this function, Trio acts like the coroutine object just exited
    and the task terminates with the given outcome. This is useful if you want
    to permanently switch the coroutine object over to a different coroutine
    runner.

    When the calling coroutine enters this function it's running under Trio,
    and when the function returns it's running under the foreign coroutine
    runner.

    You should make sure that the coroutine object has released any
    Trio-specific resources it has acquired (e.g. nurseries).

    Args:
      final_outcome (outcome.Outcome): Trio acts as if the current task exited
          with the given return value or exception.

    Returns or raises whatever value or exception the new coroutine runner
    uses to resume the coroutine.

    """
    if _run.current_task().child_nurseries:
        raise RuntimeError(
            "can't permanently detach a coroutine object with open nurseries",
        )
    return await _async_yield(PermanentlyDetachCoroutineObject(final_outcome))


async def temporarily_detach_coroutine_object(
    abort_func: Callable[[RaiseCancelT], Abort],
) -> object:
    """Temporarily detach the current coroutine object from the Trio
    scheduler.

    When the calling coroutine enters this function it's running under Trio,
    and when the function returns it's running under the foreign coroutine
    runner.

    The Trio :class:`Task` will continue to exist, but will be suspended until
    you use :func:`reattach_detached_coroutine_object` to resume it. In the
    mean time, you can use another coroutine runner to schedule the coroutine
    object. In fact, you have to – the function doesn't return until the
    coroutine is advanced from outside.

    Note that you'll need to save the current :class:`Task` object to later
    resume; you can retrieve it with :func:`current_task`. You can also use
    this :class:`Task` object to retrieve the coroutine object – see
    :data:`Task.coro`.

    Args:
      abort_func: Same as for :func:`wait_task_rescheduled`, except that it
          must return :data:`Abort.FAILED`. (If it returned
          :data:`Abort.SUCCEEDED`, then Trio would attempt to reschedule the
          detached task directly without going through
          :func:`reattach_detached_coroutine_object`, which would be bad.)
          Your ``abort_func`` should still arrange for whatever the coroutine
          object is doing to be cancelled, and then reattach to Trio and call
          the ``raise_cancel`` callback, if possible.

    Returns or raises whatever value or exception the new coroutine runner
    uses to resume the coroutine.

    """
    return await _async_yield(WaitTaskRescheduled(abort_func))


async def reattach_detached_coroutine_object(task: Task, yield_value: object) -> None:
    """Reattach a coroutine object that was detached using
    :func:`temporarily_detach_coroutine_object`.

    When the calling coroutine enters this function it's running under the
    foreign coroutine runner, and when the function returns it's running under
    Trio.

    This must be called from inside the coroutine being resumed, and yields
    whatever value you pass in. (Presumably you'll pass a value that will
    cause the current coroutine runner to stop scheduling this task.) Then the
    coroutine is resumed by the Trio scheduler at the next opportunity.

    Args:
      task (Task): The Trio task object that the current coroutine was
          detached from.
      yield_value (object): The object to yield to the current coroutine
          runner.

    """
    # This is a kind of crude check – in particular, it can fail if the
    # passed-in task is where the coroutine *runner* is running. But this is
    # an experts-only interface, and there's no easy way to do a more accurate
    # check, so I guess that's OK.
    if not task.coro.cr_running:
        raise RuntimeError("given task does not match calling coroutine")
    _run.reschedule(task, outcome.Value("reattaching"))
    value = await _async_yield(yield_value)
    assert value == outcome.Value("reattaching")
