/* * Copyright (c) 2023, Andreas Kling * Copyright (c) 2024-2025, stasoid * Copyright (c) 2025, ayeteadoe * Copyright (c) 2025, Ryszard Goc * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include #include #include struct OwnHandle { HANDLE handle = NULL; explicit OwnHandle(HANDLE h = NULL) : handle(h) { } OwnHandle(OwnHandle&& h) { handle = h.handle; h.handle = NULL; } // This operation can only be done when handle is NULL OwnHandle& operator=(OwnHandle&& other) { VERIFY(!handle); if (this == &other) return *this; handle = other.handle; other.handle = NULL; return *this; } ~OwnHandle() { if (handle) CloseHandle(handle); } bool operator==(OwnHandle const& h) const { return handle == h.handle; } bool operator==(HANDLE h) const { return handle == h; } }; template<> struct Traits : DefaultTraits { static unsigned hash(OwnHandle const& h) { return Traits::hash(h.handle); } }; template<> constexpr bool IsHashCompatible = true; namespace Core { enum class CompletionType : u8 { Wake, Timer, Notifer, }; struct CompletionPacket { CompletionType type; }; struct EventLoopWake final : CompletionPacket { OwnHandle wait_packet; OwnHandle wait_event; }; struct EventLoopTimer final : CompletionPacket { ~EventLoopTimer() { CancelWaitableTimer(timer.handle); } OwnHandle timer; OwnHandle wait_packet; bool is_periodic; WeakPtr owner; }; struct EventLoopNotifier final : CompletionPacket { ~EventLoopNotifier() { } Notifier* notifier; OwnHandle wait_packet; OwnHandle wait_event; }; struct ThreadData { static ThreadData* the() { thread_local OwnPtr thread_data = make(); if (thread_data) return &*thread_data; return nullptr; } ThreadData() : wake_data(make()) { wake_data->type = CompletionType::Wake; wake_data->wait_event.handle = CreateEvent(NULL, FALSE, FALSE, NULL); // Consider a way for different event loops to have a different number of threads iocp.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); VERIFY(iocp.handle); NTSTATUS status = g_system.NtCreateWaitCompletionPacket(&wake_data->wait_packet.handle, GENERIC_READ | GENERIC_WRITE, NULL); VERIFY(NT_SUCCESS(status)); status = g_system.NtAssociateWaitCompletionPacket(wake_data->wait_packet.handle, iocp.handle, wake_data->wait_event.handle, wake_data.ptr(), NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); } ~ThreadData() { NTSTATUS status = g_system.NtCancelWaitCompletionPacket(wake_data->wait_packet.handle, TRUE); VERIFY(NT_SUCCESS(status)); } OwnHandle iocp; // These are only used to register and unregister. The event loop doesn't access these. HashMap> timers; HashMap> notifiers; // The wake completion packet is posted to the thread's event loop to wake it. NonnullOwnPtr wake_data; }; EventLoopImplementationWindows::EventLoopImplementationWindows() : m_wake_event(ThreadData::the()->wake_data->wait_event.handle) { } EventLoopImplementationWindows::~EventLoopImplementationWindows() { } int EventLoopImplementationWindows::exec() { for (;;) { if (m_exit_requested) return m_exit_code; pump(PumpMode::WaitForEvents); } VERIFY_NOT_REACHED(); } static constexpr bool debug_event_loop = false; size_t EventLoopImplementationWindows::pump(PumpMode pump_mode) { auto& event_queue = ThreadEventQueue::current(); auto* thread_data = ThreadData::the(); // NOTE: The number of entries to dequeue is to be optimized. Ideally we always dequeue all outstanding packets, // but we don't want to increase the cost of each pump unnecessarily. If more than one entry is never dequeued // at once, we could switch to using GetQueuedCompletionStatus which directly returns the values. constexpr ULONG entry_count = 32; OVERLAPPED_ENTRY entries[entry_count]; ULONG entries_removed = 0; bool has_pending_events = event_queue.has_pending_events(); DWORD timeout = 0; if (!has_pending_events && pump_mode == PumpMode::WaitForEvents) timeout = INFINITE; BOOL success = GetQueuedCompletionStatusEx(thread_data->iocp.handle, entries, entry_count, &entries_removed, timeout, FALSE); dbgln_if(debug_event_loop, "Event loop dequed {} events", entries_removed); if (success) { for (ULONG i = 0; i < entries_removed; i++) { auto& entry = entries[i]; auto* packet = reinterpret_cast(entry.lpCompletionKey); if (packet->type == CompletionType::Wake) { auto* wake_data = static_cast(packet); NTSTATUS status = g_system.NtAssociateWaitCompletionPacket(wake_data->wait_packet.handle, thread_data->iocp.handle, wake_data->wait_event.handle, wake_data, NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); continue; } if (packet->type == CompletionType::Timer) { auto* timer = static_cast(packet); if (auto owner = timer->owner.strong_ref()) event_queue.post_event(*owner, make()); if (timer->is_periodic) { NTSTATUS status = g_system.NtAssociateWaitCompletionPacket(timer->wait_packet.handle, thread_data->iocp.handle, timer->timer.handle, timer, NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); } continue; } if (packet->type == CompletionType::Notifer) { auto* notifier_data = static_cast(packet); event_queue.post_event(*notifier_data->notifier, make()); NTSTATUS status = g_system.NtAssociateWaitCompletionPacket(notifier_data->wait_packet.handle, thread_data->iocp.handle, notifier_data->wait_event.handle, notifier_data, NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); continue; } VERIFY_NOT_REACHED(); } } else { DWORD error = GetLastError(); switch (error) { case WAIT_TIMEOUT: break; default: dbgln("GetQueuedCompletionStatusEx failed with unexpected error: {}", Error::from_windows_error(error)); VERIFY_NOT_REACHED(); } } return event_queue.process(); } void EventLoopImplementationWindows::quit(int code) { m_exit_requested = true; m_exit_code = code; } void EventLoopImplementationWindows::post_event(EventReceiver& receiver, NonnullOwnPtr&& event) { m_thread_event_queue.post_event(receiver, move(event)); if (&m_thread_event_queue != &ThreadEventQueue::current()) wake(); } void EventLoopImplementationWindows::wake() { SetEvent(m_wake_event); } static int notifier_type_to_network_event(NotificationType type) { switch (type) { case NotificationType::Read: return FD_READ | FD_CLOSE | FD_ACCEPT; case NotificationType::Write: return FD_WRITE; default: dbgln("This notification type is not implemented: {}", (int)type); VERIFY_NOT_REACHED(); } } void EventLoopManagerWindows::register_notifier(Notifier& notifier) { auto* thread_data = ThreadData::the(); auto& notifiers = thread_data->notifiers; if (notifiers.contains(¬ifier)) return; HANDLE event = CreateEvent(NULL, FALSE, FALSE, NULL); VERIFY(event); int rc = WSAEventSelect(notifier.fd(), event, notifier_type_to_network_event(notifier.type())); VERIFY(!rc); auto notifier_data = make(); notifier_data->type = CompletionType::Notifer; notifier_data->notifier = ¬ifier; notifier_data->wait_event.handle = event; NTSTATUS status = g_system.NtCreateWaitCompletionPacket(¬ifier_data->wait_packet.handle, GENERIC_READ | GENERIC_WRITE, NULL); VERIFY(NT_SUCCESS(status)); status = g_system.NtAssociateWaitCompletionPacket(notifier_data->wait_packet.handle, thread_data->iocp.handle, event, notifier_data.ptr(), NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); notifiers.set(¬ifier, move(notifier_data)); } void EventLoopManagerWindows::unregister_notifier(Notifier& notifier) { auto* thread_data = ThreadData::the(); VERIFY(thread_data); auto& notifiers = thread_data->notifiers; auto maybe_notifier_data = notifiers.take(¬ifier); if (!maybe_notifier_data.has_value()) return; auto notifier_data = move(maybe_notifier_data.value()); // We are removing the signalled packets since the caller no longer expects them NTSTATUS status = g_system.NtCancelWaitCompletionPacket(notifier_data->wait_packet.handle, TRUE); VERIFY(NT_SUCCESS(status)); // TODO: Reuse the data structure } intptr_t EventLoopManagerWindows::register_timer(EventReceiver& object, int milliseconds, bool should_reload) { VERIFY(milliseconds >= 0); auto* thread_data = ThreadData::the(); VERIFY(thread_data); auto& timers = thread_data->timers; // FIXME: This is a temporary fix for issue #3641 bool manual_reset = static_cast(object).is_single_shot(); HANDLE timer = CreateWaitableTimer(NULL, manual_reset, NULL); VERIFY(timer); auto timer_data = make(); timer_data->type = CompletionType::Timer; timer_data->timer.handle = timer; timer_data->owner = object.make_weak_ptr(); timer_data->is_periodic = should_reload; VERIFY(timer_data->timer.handle); NTSTATUS status = g_system.NtCreateWaitCompletionPacket(&timer_data->wait_packet.handle, GENERIC_READ | GENERIC_WRITE, NULL); VERIFY(NT_SUCCESS(status)); LARGE_INTEGER first_time = {}; // Measured in 0.1μs intervals, negative means starting from now first_time.QuadPart = -10'000LL * milliseconds; BOOL succeeded = SetWaitableTimer(timer_data->timer.handle, &first_time, should_reload ? milliseconds : 0, NULL, NULL, FALSE); VERIFY(succeeded); status = g_system.NtAssociateWaitCompletionPacket(timer_data->wait_packet.handle, thread_data->iocp.handle, timer_data->timer.handle, timer_data.ptr(), NULL, 0, 0, NULL); VERIFY(NT_SUCCESS(status)); auto timer_id = reinterpret_cast(timer_data.ptr()); VERIFY(!timers.get(timer_id).has_value()); timers.set(timer_id, move(timer_data)); return timer_id; } void EventLoopManagerWindows::unregister_timer(intptr_t timer_id) { if (auto* thread_data = ThreadData::the()) { auto maybe_timer = thread_data->timers.take(timer_id); if (!maybe_timer.has_value()) return; auto timer = move(maybe_timer.value()); NTSTATUS status = g_system.NtCancelWaitCompletionPacket(timer->wait_packet.handle, TRUE); VERIFY(NT_SUCCESS(status)); } } int EventLoopManagerWindows::register_signal([[maybe_unused]] int signal_number, [[maybe_unused]] Function handler) { dbgln("Core::EventLoopManagerWindows::register_signal() is not implemented"); VERIFY_NOT_REACHED(); } void EventLoopManagerWindows::unregister_signal([[maybe_unused]] int handler_id) { dbgln("Core::EventLoopManagerWindows::unregister_signal() is not implemented"); VERIFY_NOT_REACHED(); } void EventLoopManagerWindows::did_post_event() { } NonnullOwnPtr EventLoopManagerWindows::make_implementation() { return make(); } }