/* * Copyright (c) 2023, Andreas Kling * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include namespace Core { struct ThreadEventQueue::Private { struct QueuedEvent { AK_MAKE_NONCOPYABLE(QueuedEvent); AK_MAKE_DEFAULT_MOVABLE(QueuedEvent); public: QueuedEvent(RefPtr const& receiver, NonnullOwnPtr event) : receiver(receiver) , event(move(event)) , event_type(this->event->type()) { } QueuedEvent(RefPtr const& receiver, Event::Type event_type) : receiver(receiver) , event_type(event_type) { } QueuedEvent(Function&& invokee) : m_invokee(move(invokee)) , event_type(Event::Type::DeferredInvoke) { } ~QueuedEvent() = default; WeakPtr receiver; OwnPtr event; Function m_invokee; u8 event_type { Event::Type::Invalid }; }; Threading::Mutex mutex; Vector queued_events; Vector>>, 16> pending_promises; bool warned_promise_count { false }; }; static pthread_key_t s_current_thread_event_queue_key; static pthread_once_t s_current_thread_event_queue_key_once = PTHREAD_ONCE_INIT; ThreadEventQueue& ThreadEventQueue::current() { pthread_once(&s_current_thread_event_queue_key_once, [] { pthread_key_create(&s_current_thread_event_queue_key, [](void* value) { if (value) delete static_cast(value); }); }); auto* ptr = static_cast(pthread_getspecific(s_current_thread_event_queue_key)); if (!ptr) { ptr = new ThreadEventQueue; pthread_setspecific(s_current_thread_event_queue_key, ptr); } return *ptr; } ThreadEventQueue::ThreadEventQueue() : m_private(make()) { } ThreadEventQueue::~ThreadEventQueue() = default; void ThreadEventQueue::post_event(Core::EventReceiver* receiver, NonnullOwnPtr event) { { Threading::MutexLocker lock(m_private->mutex); m_private->queued_events.empend(receiver, move(event)); } Core::EventLoopManager::the().did_post_event(); } void ThreadEventQueue::post_event(Core::EventReceiver* receiver, Core::Event::Type event_type) { { Threading::MutexLocker lock(m_private->mutex); m_private->queued_events.empend(receiver, event_type); } Core::EventLoopManager::the().did_post_event(); } void ThreadEventQueue::deferred_invoke(Function&& invokee) { { Threading::MutexLocker lock(m_private->mutex); m_private->queued_events.empend(move(invokee)); } Core::EventLoopManager::the().did_post_event(); } void ThreadEventQueue::add_job(NonnullRefPtr>> promise) { Threading::MutexLocker lock(m_private->mutex); m_private->pending_promises.append(move(promise)); } void ThreadEventQueue::cancel_all_pending_jobs() { Threading::MutexLocker lock(m_private->mutex); for (auto const& promise : m_private->pending_promises) promise->reject(Error::from_errno(ECANCELED)); m_private->pending_promises.clear(); } size_t ThreadEventQueue::process() { decltype(m_private->queued_events) events; { Threading::MutexLocker locker(m_private->mutex); events = move(m_private->queued_events); m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_rejected(); }); } size_t processed_events = 0; for (auto& queued_event : events) { if (auto receiver = queued_event.receiver.strong_ref()) { switch (queued_event.event_type) { case Event::Type::Timer: { TimerEvent timer_event; receiver->dispatch_event(timer_event); break; } case Event::Type::NotifierActivation: { NotifierActivationEvent notifier_activation_event; receiver->dispatch_event(notifier_activation_event); break; } default: receiver->dispatch_event(*queued_event.event); break; } } else { if (queued_event.event_type == Event::Type::DeferredInvoke) { queued_event.m_invokee(); } else { // Receiver gone, drop the event. } } ++processed_events; } { Threading::MutexLocker locker(m_private->mutex); if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) { m_private->warned_promise_count = true; dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size()); } } return processed_events; } bool ThreadEventQueue::has_pending_events() const { Threading::MutexLocker locker(m_private->mutex); return !m_private->queued_events.is_empty(); } }