From 9e5292c6cf8f0efbd73bb976a6af168b28c721a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pa=CC=84vels=20Nadtoc=CC=8Cajevs?= <7645683+bruvzg@users.noreply.github.com> Date: Mon, 3 Mar 2025 15:09:37 +0200 Subject: [PATCH] Use separate WorkThreadPool for shader compiler. (cherry picked from commit 53bb897458a9761d00936ad53ea95bd78b4a6b0b) --- core/object/worker_thread_pool.cpp | 68 +++++++++++++++------ core/object/worker_thread_pool.h | 12 +++- servers/rendering/renderer_rd/shader_rd.cpp | 5 +- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 08903d61964..d9ff3057679 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -37,6 +37,8 @@ WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; +HashMap WorkerThreadPool::named_pools; + void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); ERR_FAIL_NULL(native_func_userdata); @@ -184,25 +186,25 @@ void WorkerThreadPool::_thread_function(void *p_user) { while (true) { Task *task_to_process = nullptr; { - MutexLock lock(singleton->task_mutex); + MutexLock lock(thread_data->pool->task_mutex); - bool exit = singleton->_handle_runlevel(thread_data, lock); + bool exit = thread_data->pool->_handle_runlevel(thread_data, lock); if (unlikely(exit)) { break; } thread_data->signaled = false; - if (singleton->task_queue.first()) { - task_to_process = singleton->task_queue.first()->self(); - singleton->task_queue.remove(singleton->task_queue.first()); + if (thread_data->pool->task_queue.first()) { + task_to_process = thread_data->pool->task_queue.first()->self(); + thread_data->pool->task_queue.remove(thread_data->pool->task_queue.first()); } else { thread_data->cond_var.wait(lock); } } if (task_to_process) { - singleton->_process_task(task_to_process); + thread_data->pool->_process_task(task_to_process); } } } @@ -497,7 +499,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T } } - if (singleton->task_queue.first()) { + if (p_caller_pool_thread->pool->task_queue.first()) { task_to_process = task_queue.first()->self(); task_queue.remove(task_queue.first()); } @@ -505,7 +507,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T if (!task_to_process) { p_caller_pool_thread->awaited_task = p_task; - _unlock_unlockable_mutexes(); + if (this == singleton) { + _unlock_unlockable_mutexes(); + } relock_unlockables = true; p_caller_pool_thread->cond_var.wait(lock); @@ -514,7 +518,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T } } - if (relock_unlockables) { + if (relock_unlockables && this == singleton) { _lock_unlockable_mutexes(); } @@ -690,9 +694,13 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { { Group *group = *groupp; - _unlock_unlockable_mutexes(); + if (this == singleton) { + _unlock_unlockable_mutexes(); + } group->done_semaphore.wait(); - _lock_unlockable_mutexes(); + if (this == singleton) { + _lock_unlockable_mutexes(); + } uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. @@ -709,15 +717,15 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { #endif } -int WorkerThreadPool::get_thread_index() { +int WorkerThreadPool::get_thread_index() const { Thread::ID tid = Thread::get_caller_id(); - return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; + return thread_ids.has(tid) ? thread_ids[tid] : -1; } -WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() { +WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() const { int th_index = get_thread_index(); - if (th_index != -1 && singleton->threads[th_index].current_task) { - return singleton->threads[th_index].current_task->self; + if (th_index != -1 && threads[th_index].current_task) { + return threads[th_index].current_task->self; } else { return INVALID_TASK_ID; } @@ -766,6 +774,7 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) for (uint32_t i = 0; i < threads.size(); i++) { threads[i].index = i; + threads[i].pool = this; threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]); thread_ids.insert(threads[i].thread.get_id(), i); } @@ -832,10 +841,33 @@ void WorkerThreadPool::_bind_methods() { ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion); } -WorkerThreadPool::WorkerThreadPool() { - singleton = this; +WorkerThreadPool *WorkerThreadPool::get_named_pool(const StringName &p_name) { + WorkerThreadPool **pool_ptr = named_pools.getptr(p_name); + if (pool_ptr) { + return *pool_ptr; + } else { + WorkerThreadPool *pool = memnew(WorkerThreadPool(false)); + pool->init(); + named_pools[p_name] = pool; + return pool; + } +} + +WorkerThreadPool::WorkerThreadPool(bool p_singleton) { + if (p_singleton) { + singleton = this; + } } WorkerThreadPool::~WorkerThreadPool() { finish(); + + if (this == singleton) { + singleton = nullptr; + for (KeyValue &E : named_pools) { + E.value->finish(); + memdelete(E.value); + } + named_pools.clear(); + } } diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 58e86e3e48e..76332ec8a3d 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -119,6 +119,7 @@ private: Task *current_task = nullptr; Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; + WorkerThreadPool *pool = nullptr; ThreadData() : signaled(false), @@ -166,6 +167,8 @@ private: uint64_t last_task = 1; + static HashMap named_pools; + static void _thread_function(void *p_user); void _process_task(Task *task); @@ -266,9 +269,12 @@ public: #endif } + // Note: Do not use this unless you know what you are doing, and it is absolutely necessary. Main thread pool (`get_singleton()`) should be preferred instead. + static WorkerThreadPool *get_named_pool(const StringName &p_name); + static WorkerThreadPool *get_singleton() { return singleton; } - static int get_thread_index(); - static TaskID get_caller_task_id(); + int get_thread_index() const; + TaskID get_caller_task_id() const; #ifdef THREADS_ENABLED _ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const MutexLock &p_lock) { return _thread_enter_unlock_allowance_zone(p_lock._get_lock()); } @@ -285,7 +291,7 @@ public: void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); void exit_languages_threads(); void finish(); - WorkerThreadPool(); + WorkerThreadPool(bool p_singleton = true); ~WorkerThreadPool(); }; diff --git a/servers/rendering/renderer_rd/shader_rd.cpp b/servers/rendering/renderer_rd/shader_rd.cpp index cbdbe151c82..ed9428a6a35 100644 --- a/servers/rendering/renderer_rd/shader_rd.cpp +++ b/servers/rendering/renderer_rd/shader_rd.cpp @@ -555,7 +555,7 @@ void ShaderRD::_compile_version_start(Version *p_version, int p_group) { compile_data.version = p_version; compile_data.group = p_group; - WorkerThreadPool::GroupID group_task = WorkerThreadPool::get_singleton()->add_template_group_task(this, &ShaderRD::_compile_variant, compile_data, group_to_variant_map[p_group].size(), -1, true, SNAME("ShaderCompilation")); + WorkerThreadPool::GroupID group_task = WorkerThreadPool::get_named_pool(SNAME("ShaderCompilationPool"))->add_template_group_task(this, &ShaderRD::_compile_variant, compile_data, group_to_variant_map[p_group].size(), -1, true, SNAME("ShaderCompilation")); p_version->group_compilation_tasks.write[p_group] = group_task; } @@ -563,9 +563,8 @@ void ShaderRD::_compile_version_end(Version *p_version, int p_group) { if (p_version->group_compilation_tasks.size() <= p_group || p_version->group_compilation_tasks[p_group] == 0) { return; } - WorkerThreadPool::GroupID group_task = p_version->group_compilation_tasks[p_group]; - WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group_task); + WorkerThreadPool::get_named_pool(SNAME("ShaderCompilationPool"))->wait_for_group_task_completion(group_task); p_version->group_compilation_tasks.write[p_group] = 0; bool all_valid = true;