Merge pull request #90268 from RandomShaper/wtp_servers

Use WorkerThreadPool for Server threads (enhanced)
This commit is contained in:
Rémi Verschelde 2024-04-15 10:12:00 +02:00 committed by GitHub
commit c951421c99
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 462 additions and 394 deletions

View file

@ -33,6 +33,7 @@
#include "core/config/project_settings.h"
#include "core/math/random_number_generator.h"
#include "core/object/worker_thread_pool.h"
#include "core/os/os.h"
#include "core/os/thread.h"
#include "core/templates/command_queue_mt.h"
@ -100,7 +101,7 @@ public:
ThreadWork reader_threadwork;
ThreadWork writer_threadwork;
CommandQueueMT command_queue = CommandQueueMT(true);
CommandQueueMT command_queue;
enum TestMsgType {
TEST_MSG_FUNC1_TRANSFORM,
@ -119,6 +120,7 @@ public:
bool exit_threads = false;
Thread reader_thread;
WorkerThreadPool::TaskID reader_task_id = WorkerThreadPool::INVALID_TASK_ID;
Thread writer_thread;
int func1_count = 0;
@ -148,11 +150,16 @@ public:
void reader_thread_loop() {
reader_threadwork.thread_wait_for_work();
while (!exit_threads) {
if (message_count_to_read < 0) {
if (reader_task_id == WorkerThreadPool::INVALID_TASK_ID) {
command_queue.flush_all();
}
for (int i = 0; i < message_count_to_read; i++) {
command_queue.wait_and_flush();
} else {
if (message_count_to_read < 0) {
command_queue.flush_all();
}
for (int i = 0; i < message_count_to_read; i++) {
WorkerThreadPool::get_singleton()->yield();
command_queue.wait_and_flush();
}
}
message_count_to_read = 0;
@ -216,8 +223,13 @@ public:
sts->writer_thread_loop();
}
void init_threads() {
reader_thread.start(&SharedThreadState::static_reader_thread_loop, this);
void init_threads(bool p_use_thread_pool_sync = false) {
if (p_use_thread_pool_sync) {
reader_task_id = WorkerThreadPool::get_singleton()->add_native_task(&SharedThreadState::static_reader_thread_loop, this, true);
command_queue.set_pump_task_id(reader_task_id);
} else {
reader_thread.start(&SharedThreadState::static_reader_thread_loop, this);
}
writer_thread.start(&SharedThreadState::static_writer_thread_loop, this);
}
void destroy_threads() {
@ -225,16 +237,20 @@ public:
reader_threadwork.main_start_work();
writer_threadwork.main_start_work();
reader_thread.wait_to_finish();
if (reader_task_id != WorkerThreadPool::INVALID_TASK_ID) {
WorkerThreadPool::get_singleton()->wait_for_task_completion(reader_task_id);
} else {
reader_thread.wait_to_finish();
}
writer_thread.wait_to_finish();
}
};
TEST_CASE("[CommandQueue] Test Queue Basics") {
static void test_command_queue_basic(bool p_use_thread_pool_sync) {
const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
SharedThreadState sts;
sts.init_threads();
sts.init_threads(p_use_thread_pool_sync);
sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
sts.writer_threadwork.main_start_work();
@ -272,6 +288,14 @@ TEST_CASE("[CommandQueue] Test Queue Basics") {
ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
}
TEST_CASE("[CommandQueue] Test Queue Basics") {
test_command_queue_basic(false);
}
TEST_CASE("[CommandQueue] Test Queue Basics with WorkerThreadPool sync.") {
test_command_queue_basic(true);
}
TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {
const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);