diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index e0a8d558c9a..70057b5a9d2 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -76,6 +76,7 @@ void WorkerThreadPool::_process_task(Task *p_task) { p_task->pool_thread_index = pool_thread_index; prev_task = curr_thread.current_task; curr_thread.current_task = p_task; + curr_thread.has_pump_task = p_task->is_pump_task; if (p_task->pending_notify_yield_over) { curr_thread.yield_is_over = true; } @@ -218,11 +219,13 @@ void WorkerThreadPool::_thread_function(void *p_user) { } } -void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock) { +void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock, bool p_pump_task) { // Fall back to processing on the calling thread if there are no worker threads. // Separated into its own variable to make it easier to extend this logic // in custom builds. - bool process_on_calling_thread = threads.is_empty(); + + // Avoid calling pump tasks or low priority tasks from the calling thread. + bool process_on_calling_thread = threads.is_empty() && p_high_priority && !p_pump_task; if (process_on_calling_thread) { p_lock.temp_unlock(); for (uint32_t i = 0; i < p_count; i++) { @@ -339,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void * return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); } -WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { +WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task) { MutexLock lock(task_mutex); // Get a free task @@ -351,15 +354,50 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, task->native_func_userdata = p_userdata; task->description = p_description; task->template_userdata = p_template_userdata; + task->is_pump_task = p_pump_task; tasks.insert(id, task); - _post_tasks(&task, 1, p_high_priority, lock); +#ifdef THREADS_ENABLED + if (p_pump_task) { + pump_task_count++; + int thread_count = get_thread_count(); + if (pump_task_count >= thread_count) { + print_verbose(vformat("A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads.", pump_task_count + 1, thread_count)); // +1 because we want to keep a Thread without any pump tasks free. + + Thread::Settings settings; +#ifdef __APPLE__ + // The default stack size for new threads on Apple platforms is 512KiB. + // This is insufficient when using a library like SPIRV-Cross, + // which can generate deep stacks and result in a stack overflow. +#ifdef DEV_ENABLED + // Debug builds need an even larger stack size. + settings.stack_size = 2 * 1024 * 1024; // 2 MiB +#else + settings.stack_size = 1 * 1024 * 1024; // 1 MiB +#endif +#endif + // Re-sizing implies relocation, which is not supported for this array. + CRASH_COND_MSG(thread_count + 1 > (int)threads.get_capacity(), "Reserve trick for worker thread pool failed. Crashing."); + threads.resize_initialized(thread_count + 1); + threads[thread_count].index = thread_count; + threads[thread_count].pool = this; + threads[thread_count].thread.start(&WorkerThreadPool::_thread_function, &threads[thread_count], settings); + thread_ids.insert(threads[thread_count].thread.get_id(), thread_count); + } + } +#endif + + _post_tasks(&task, 1, p_high_priority, lock, p_pump_task); return id; } -WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) { - return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description); +WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) { + return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, p_pump_task); +} + +WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind(const Callable &p_action, bool p_high_priority, const String &p_description) { + return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, false); } bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const { @@ -510,7 +548,12 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T if (p_caller_pool_thread->pool->task_queue.first()) { task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); + if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true) && task_to_process->is_pump_task) { + task_to_process = nullptr; + _notify_threads(p_caller_pool_thread, 1, 0); + } else { + task_queue.remove(task_queue.first()); + } } if (!task_to_process) { @@ -661,7 +704,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca groups[id] = group; - _post_tasks(tasks_posted, p_tasks, p_high_priority, lock); + _post_tasks(tasks_posted, p_tasks, p_high_priority, lock, false); return id; } @@ -788,6 +831,11 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads)); +#ifdef THREADS_ENABLED + // Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks. + // We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed. + threads.reserve(5); +#endif threads.resize(p_thread_count); Thread::Settings settings; @@ -862,7 +910,7 @@ void WorkerThreadPool::finish() { } void WorkerThreadPool::_bind_methods() { - ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String())); + ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task_bind, DEFVAL(false), DEFVAL(String())); ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed); ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion); ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id); diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 9a26b88348e..7e36f73e90c 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -80,6 +80,7 @@ private: Semaphore done_semaphore; // For user threads awaiting. bool completed : 1; bool pending_notify_yield_over : 1; + bool is_pump_task : 1; Group *group = nullptr; SelfList task_elem; uint32_t waiting_pool = 0; @@ -92,6 +93,7 @@ private: Task() : completed(false), pending_notify_yield_over(false), + is_pump_task(false), task_elem(this) {} }; @@ -115,6 +117,7 @@ private: bool yield_is_over : 1; bool pre_exited_languages : 1; bool exited_languages : 1; + bool has_pump_task : 1; // Threads can only have one pump task. Task *current_task = nullptr; Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; @@ -124,7 +127,8 @@ private: signaled(false), yield_is_over(false), pre_exited_languages(false), - exited_languages(false) {} + exited_languages(false), + has_pump_task(false) {} }; TightLocalVector threads; @@ -165,6 +169,7 @@ private: uint32_t notify_index = 0; // For rotating across threads, no help distributing load. uint64_t last_task = 1; + int pump_task_count = 0; static HashMap named_pools; @@ -172,7 +177,7 @@ private: void _process_task(Task *task); - void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock); + void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock, bool p_pump_task); void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count); bool _try_promote_low_priority_task(); @@ -188,7 +193,7 @@ private: static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS]; #endif - TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); + TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task = false); GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description); template @@ -237,7 +242,8 @@ public: return _add_task(Callable(), nullptr, nullptr, ud, p_high_priority, p_description); } TaskID add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority = false, const String &p_description = String()); - TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String()); + TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String(), bool p_pump_task = false); + TaskID add_task_bind(const Callable &p_action, bool p_high_priority = false, const String &p_description = String()); bool is_task_completed(TaskID p_task_id) const; Error wait_for_task_completion(TaskID p_task_id); diff --git a/modules/betsy/image_compress_betsy.cpp b/modules/betsy/image_compress_betsy.cpp index 6a81a9f1a22..df4f8613ac1 100644 --- a/modules/betsy/image_compress_betsy.cpp +++ b/modules/betsy/image_compress_betsy.cpp @@ -223,7 +223,7 @@ void BetsyCompressor::_init() { } void BetsyCompressor::init() { - WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true); + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true, "Betsy pump task", true); command_queue.set_pump_task_id(tid); command_queue.push(this, &BetsyCompressor::_assign_mt_ids, tid); command_queue.push_and_sync(this, &BetsyCompressor::_init); diff --git a/servers/physics_server_2d_wrap_mt.cpp b/servers/physics_server_2d_wrap_mt.cpp index 991bbfb6a0a..54e1472e1a8 100644 --- a/servers/physics_server_2d_wrap_mt.cpp +++ b/servers/physics_server_2d_wrap_mt.cpp @@ -75,7 +75,7 @@ void PhysicsServer2DWrapMT::end_sync() { void PhysicsServer2DWrapMT::init() { if (create_thread) { - WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true); + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true, "Physics server 2D pump task", true); command_queue.set_pump_task_id(tid); command_queue.push(this, &PhysicsServer2DWrapMT::_assign_mt_ids, tid); command_queue.push_and_sync(physics_server_2d, &PhysicsServer2D::init); diff --git a/servers/physics_server_3d_wrap_mt.cpp b/servers/physics_server_3d_wrap_mt.cpp index 2972cbd3537..a1bf65ba90e 100644 --- a/servers/physics_server_3d_wrap_mt.cpp +++ b/servers/physics_server_3d_wrap_mt.cpp @@ -75,7 +75,7 @@ void PhysicsServer3DWrapMT::end_sync() { void PhysicsServer3DWrapMT::init() { if (create_thread) { - WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true); + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true, "Physics server 3D pump task", true); command_queue.set_pump_task_id(tid); command_queue.push(this, &PhysicsServer3DWrapMT::_assign_mt_ids, tid); command_queue.push_and_sync(physics_server_3d, &PhysicsServer3D::init); diff --git a/servers/rendering/rendering_server_default.cpp b/servers/rendering/rendering_server_default.cpp index 09c532aa0bc..943f3e8bbbe 100644 --- a/servers/rendering/rendering_server_default.cpp +++ b/servers/rendering/rendering_server_default.cpp @@ -253,7 +253,7 @@ void RenderingServerDefault::init() { if (create_thread) { print_verbose("RenderingServerWrapMT: Starting render thread"); DisplayServer::get_singleton()->release_rendering_thread(); - WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true); + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true, "Rendering Server pump task", true); command_queue.set_pump_task_id(tid); command_queue.push(this, &RenderingServerDefault::_assign_mt_ids, tid); command_queue.push_and_sync(this, &RenderingServerDefault::_init);