from __future__ import annotations

import sys
from typing import TYPE_CHECKING

import pytest

from .. import _core
from ..testing import check_one_way_stream, wait_all_tasks_blocked

# Mark all the tests in this file as being windows-only
pytestmark = pytest.mark.skipif(sys.platform != "win32", reason="windows only")

assert (  # Skip type checking when not on Windows
    sys.platform == "win32" or not TYPE_CHECKING
)

if sys.platform == "win32":
    from asyncio.windows_utils import pipe

    from .._core._windows_cffi import _handle, kernel32
    from .._windows_pipes import PipeReceiveStream, PipeSendStream


async def make_pipe() -> tuple[PipeSendStream, PipeReceiveStream]:
    """Makes a new pair of pipes."""
    (r, w) = pipe()
    return PipeSendStream(w), PipeReceiveStream(r)


def test_pipe_typecheck() -> None:
    with pytest.raises(TypeError):
        PipeSendStream(1.0)  # type: ignore[arg-type]
    with pytest.raises(TypeError):
        PipeReceiveStream(None)  # type: ignore[arg-type]


async def test_pipe_error_on_close() -> None:
    # Make sure we correctly handle a failure from kernel32.CloseHandle
    r, w = pipe()

    send_stream = PipeSendStream(w)
    receive_stream = PipeReceiveStream(r)

    assert kernel32.CloseHandle(_handle(r))
    assert kernel32.CloseHandle(_handle(w))

    with pytest.raises(OSError, match=r"^\[WinError 6\] The handle is invalid$"):
        await send_stream.aclose()
    with pytest.raises(OSError, match=r"^\[WinError 6\] The handle is invalid$"):
        await receive_stream.aclose()


async def test_pipes_combined() -> None:
    write, read = await make_pipe()
    count = 2**20
    replicas = 3

    async def sender() -> None:
        async with write:
            big = bytearray(count)
            for _ in range(replicas):
                await write.send_all(big)

    async def reader() -> None:
        async with read:
            await wait_all_tasks_blocked()
            total_received = 0
            while True:
                # 5000 is chosen because it doesn't evenly divide 2**20
                received = len(await read.receive_some(5000))
                if not received:
                    break
                total_received += received

            assert total_received == count * replicas

    async with _core.open_nursery() as n:
        n.start_soon(sender)
        n.start_soon(reader)


async def test_async_with() -> None:
    w, r = await make_pipe()
    async with w, r:
        pass

    with pytest.raises(_core.ClosedResourceError):
        await w.send_all(b"")
    with pytest.raises(_core.ClosedResourceError):
        await r.receive_some(10)


async def test_close_during_write() -> None:
    w, _r = await make_pipe()
    async with _core.open_nursery() as nursery:

        async def write_forever() -> None:
            with pytest.raises(_core.ClosedResourceError) as excinfo:
                while True:
                    await w.send_all(b"x" * 4096)
            assert "another task" in str(excinfo.value)

        nursery.start_soon(write_forever)
        await wait_all_tasks_blocked(0.1)
        await w.aclose()


async def test_pipe_fully() -> None:
    # passing make_clogged_pipe tests wait_send_all_might_not_block, and we
    # can't implement that on Windows
    await check_one_way_stream(make_pipe, None)
