LibIPC: Change TransportSocket to read and write on I/O thread

Previously, TransportSocket sent queued messages from a separate thread
but performed all reading on the main thread. With this change, both
reading and writing are handled on the same I/O thread. This would allow
us to read IPC messages even while the main thread is blocked and
process them on a different thread (e.g., a rendering thread).
This commit is contained in:
Aliaksandr Kalenik 2025-10-13 04:58:27 +02:00 committed by Alexander Kalenik
parent 0b20ba1c0f
commit b5db79be6d
Notes: github-actions[bot] 2025-11-02 21:44:07 +00:00
2 changed files with 192 additions and 115 deletions

View file

@ -18,15 +18,6 @@ void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
Threading::MutexLocker locker(m_mutex);
VERIFY(MUST(m_stream.write_some(bytes.span())) == bytes.size());
m_fds.append(fds.data(), fds.size());
m_condition.signal();
}
SendQueue::Running SendQueue::block_until_message_enqueued()
{
Threading::MutexLocker locker(m_mutex);
while (m_stream.is_eof() && m_fds.is_empty() && m_running)
m_condition.wait();
return m_running ? Running::Yes : Running::No;
}
SendQueue::BytesAndFds SendQueue::peek(size_t max_bytes)
@ -52,96 +43,161 @@ void SendQueue::discard(size_t bytes_count, size_t fds_count)
m_fds.remove(0, fds_count);
}
void SendQueue::stop()
{
Threading::MutexLocker locker(m_mutex);
m_running = false;
m_condition.signal();
}
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
: m_socket(move(socket))
{
m_send_queue = adopt_ref(*new SendQueue);
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
for (;;) {
if (send_queue->block_until_message_enqueued() == SendQueue::Running::No)
break;
auto [bytes, fds] = send_queue->peek(4096);
ReadonlyBytes remaining_bytes_to_send = bytes;
if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed)
break;
}
return 0;
});
m_send_thread->start();
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
(void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
m_send_queue = adopt_ref(*new SendQueue);
{
auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK));
m_wakeup_io_thread_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0]));
m_wakeup_io_thread_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1]));
}
{
auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK));
m_notify_hook_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0]));
m_notify_hook_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1]));
}
m_io_thread = Threading::Thread::construct([this] { return io_thread_loop(); });
m_io_thread->start();
}
intptr_t TransportSocket::io_thread_loop()
{
Array<struct pollfd, 2> pollfds;
for (;;) {
auto want_to_write = [&] {
auto [bytes, fds] = m_send_queue->peek(1);
return !bytes.is_empty() || !fds.is_empty();
}();
auto state = m_io_thread_state.load();
if (state == IOThreadState::Stopped)
break;
if (state == IOThreadState::SendPendingMessagesAndStop && !want_to_write) {
m_io_thread_state = IOThreadState::Stopped;
break;
}
short events = POLLIN;
if (want_to_write)
events |= POLLOUT;
pollfds[0] = { .fd = m_socket->fd().value(), .events = events, .revents = 0 };
pollfds[1] = { .fd = m_wakeup_io_thread_read_fd->value(), .events = POLLIN, .revents = 0 };
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
if (result.is_error()) {
dbgln("TransportSocket poll error: {}", result.error());
VERIFY_NOT_REACHED();
}
if (pollfds[1].revents & POLLIN) {
char buf[64];
MUST(Core::System::read(m_wakeup_io_thread_read_fd->value(), { buf, sizeof(buf) }));
}
if (pollfds[0].revents & POLLIN)
read_incoming_messages();
if (pollfds[0].revents & POLLHUP) {
m_io_thread_state = IOThreadState::Stopped;
break;
}
if (pollfds[0].revents & (POLLERR | POLLNVAL)) {
VERIFY_NOT_REACHED();
}
if (pollfds[0].revents & POLLOUT) {
auto [bytes, fds] = m_send_queue->peek(4096);
if (!bytes.is_empty() || !fds.is_empty()) {
ReadonlyBytes remaining = bytes;
if (transfer_data(remaining, fds) == TransferState::SocketClosed) {
m_io_thread_state = IOThreadState::Stopped;
}
}
}
}
VERIFY(m_io_thread_state == IOThreadState::Stopped);
m_peer_eof = true;
m_incoming_cv.broadcast();
return 0;
}
void TransportSocket::wake_io_thread()
{
Array<u8, 1> bytes = { 0 };
(void)Core::System::write(m_wakeup_io_thread_write_fd->value(), bytes);
}
TransportSocket::~TransportSocket()
{
stop_send_thread();
stop_io_thread(IOThreadState::Stopped);
m_read_hook_notifier.clear();
}
void TransportSocket::stop_send_thread()
void TransportSocket::stop_io_thread(IOThreadState desired_state)
{
m_send_queue->stop();
if (m_send_thread->needs_to_be_joined())
(void)m_send_thread->join();
VERIFY(desired_state == IOThreadState::Stopped || desired_state == IOThreadState::SendPendingMessagesAndStop);
m_io_thread_state.store(desired_state, AK::MemoryOrder::memory_order_release);
wake_io_thread();
if (m_io_thread && m_io_thread->needs_to_be_joined())
(void)m_io_thread->join();
}
void TransportSocket::set_up_read_hook(Function<void()> hook)
{
Threading::RWLockLocker<Threading::LockMode::Write> lock(m_socket_rw_lock);
VERIFY(m_socket->is_open());
m_socket->on_ready_to_read = move(hook);
m_on_read_hook = move(hook);
m_read_hook_notifier = Core::Notifier::construct(m_notify_hook_read_fd->value(), Core::NotificationType::Read);
m_read_hook_notifier->on_activation = [this] {
VERIFY(m_notify_hook_read_fd);
char buf[64];
(void)Core::System::read(m_notify_hook_read_fd->value(), { buf, sizeof(buf) });
if (m_on_read_hook)
m_on_read_hook();
};
{
Threading::MutexLocker locker(m_incoming_mutex);
if (!m_incoming_messages.is_empty()) {
Array<u8, 1> bytes = { 0 };
MUST(Core::System::write(m_notify_hook_write_fd->value(), bytes));
}
}
}
bool TransportSocket::is_open() const
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
return m_socket->is_open();
}
void TransportSocket::close()
{
Threading::RWLockLocker<Threading::LockMode::Write> lock(m_socket_rw_lock);
stop_io_thread(IOThreadState::Stopped);
m_socket->close();
}
void TransportSocket::close_after_sending_all_pending_messages()
{
stop_send_thread();
auto [bytes, fds] = m_send_queue->peek(NumericLimits<size_t>::max());
ReadonlyBytes remaining_bytes_to_send = bytes;
while (!remaining_bytes_to_send.is_empty() || !fds.is_empty()) {
if (transfer_data(remaining_bytes_to_send, fds) == TransferState::SocketClosed)
break;
}
close();
stop_io_thread(IOThreadState::SendPendingMessagesAndStop);
m_socket->close();
}
void TransportSocket::wait_until_readable()
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
auto maybe_did_become_readable = m_socket->can_read_without_blocking(-1);
if (maybe_did_become_readable.is_error()) {
dbgln("TransportSocket::wait_until_readable: {}", maybe_did_become_readable.error());
warnln("TransportSocket::wait_until_readable: {}", maybe_did_become_readable.error());
VERIFY_NOT_REACHED();
Threading::MutexLocker lock(m_incoming_mutex);
while (m_incoming_messages.is_empty() && m_io_thread_state == IOThreadState::Running) {
m_incoming_cv.wait();
}
VERIFY(maybe_did_become_readable.value());
}
struct MessageHeader {
@ -175,8 +231,11 @@ void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<Nonn
},
bytes_to_write);
for (auto const& fd : fds)
m_fds_retained_until_received_by_peer.enqueue(fd);
{
Threading::MutexLocker locker(m_fds_retained_until_received_by_peer_mutex);
for (auto const& fd : fds)
m_fds_retained_until_received_by_peer.enqueue(fd);
}
auto raw_fds = Vector<int, 1> {};
if (num_fds_to_transfer > 0) {
@ -187,6 +246,7 @@ void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<Nonn
}
m_send_queue->enqueue_message(move(message_buffer), move(raw_fds));
wake_io_thread();
}
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector<int>& unowned_fds)
@ -220,11 +280,6 @@ TransportSocket::TransferState TransportSocket::transfer_data(ReadonlyBytes& byt
auto byte_count = bytes.size();
auto fd_count = fds.size();
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
if (!m_socket->is_open())
return TransferState::SocketClosed;
if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) {
if (result.error().is_errno() && result.error().code() == EPIPE) {
// The socket is closed from the other end, we can stop sending.
@ -240,28 +295,13 @@ TransportSocket::TransferState TransportSocket::transfer_data(ReadonlyBytes& byt
if (written_byte_count > 0 || written_fd_count > 0)
m_send_queue->discard(written_byte_count, written_fd_count);
if (!m_socket->is_open())
return TransferState::SocketClosed;
{
Vector<struct pollfd, 1> pollfds;
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
ErrorOr<int> result { 0 };
do {
result = Core::System::poll(pollfds, -1);
} while (result.is_error() && result.error().code() == EINTR);
}
return TransferState::Continue;
}
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
void TransportSocket::read_incoming_messages()
{
Threading::RWLockLocker<Threading::LockMode::Read> lock(m_socket_rw_lock);
bool should_shutdown = false;
while (is_open()) {
Vector<NonnullOwnPtr<Message>> batch;
while (m_socket->is_open()) {
u8 buffer[4096];
auto received_fds = Vector<int> {};
auto maybe_bytes_read = m_socket->receive_message({ buffer, 4096 }, MSG_DONTWAIT, received_fds);
@ -272,7 +312,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
break;
}
if (error.is_errno() && error.code() == ECONNRESET) {
should_shutdown = true;
m_peer_eof = true;
break;
}
@ -283,7 +323,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
auto bytes_read = maybe_bytes_read.release_value();
if (bytes_read.is_empty() && received_fds.is_empty()) {
should_shutdown = true;
m_peer_eof = true;
break;
}
@ -304,12 +344,12 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
break;
if (header.fd_count > m_unprocessed_fds.size())
break;
Message message;
auto message = make<Message>();
received_fd_count += header.fd_count;
for (size_t i = 0; i < header.fd_count; ++i)
message.fds.enqueue(m_unprocessed_fds.dequeue());
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size);
callback(move(message));
message->fds.enqueue(m_unprocessed_fds.dequeue());
message->bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size);
batch.append(move(message));
} else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) {
VERIFY(header.payload_size == 0);
acknowledged_fd_count += header.fd_count;
@ -319,10 +359,8 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
index += header.payload_size + sizeof(MessageHeader);
}
if (should_shutdown)
return ShouldShutdown::Yes;
if (acknowledged_fd_count > 0) {
Threading::MutexLocker locker(m_fds_retained_until_received_by_peer_mutex);
while (acknowledged_fd_count > 0) {
(void)m_fds_retained_until_received_by_peer.dequeue();
--acknowledged_fd_count;
@ -338,6 +376,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
m_send_queue->enqueue_message(move(message_buffer), {});
wake_io_thread();
}
if (index < m_unprocessed_bytes.size()) {
@ -347,18 +386,44 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
m_unprocessed_bytes.clear();
}
return ShouldShutdown::No;
auto notify_read_available = [&] {
Array<u8, 1> bytes = { 0 };
(void)Core::System::write(m_notify_hook_write_fd->value(), bytes);
};
if (!batch.is_empty()) {
Threading::MutexLocker locker(m_incoming_mutex);
m_incoming_messages.extend(move(batch));
m_incoming_cv.broadcast();
notify_read_available();
}
if (m_peer_eof) {
m_incoming_cv.broadcast();
notify_read_available();
}
}
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
{
Vector<NonnullOwnPtr<Message>> messages;
{
Threading::MutexLocker locker(m_incoming_mutex);
messages = move(m_incoming_messages);
}
for (auto& message : messages)
callback(move(*message));
return m_peer_eof ? ShouldShutdown::Yes : ShouldShutdown::No;
}
ErrorOr<int> TransportSocket::release_underlying_transport_for_transfer()
{
Threading::RWLockLocker<Threading::LockMode::Write> lock(m_socket_rw_lock);
stop_io_thread(IOThreadState::SendPendingMessagesAndStop);
return m_socket->release_fd();
}
ErrorOr<IPC::File> TransportSocket::clone_for_transfer()
{
Threading::RWLockLocker<Threading::LockMode::Write> lock(m_socket_rw_lock);
return IPC::File::clone_fd(m_socket->fd().value());
}

View file

@ -21,13 +21,6 @@ namespace IPC {
class SendQueue : public AtomicRefCounted<SendQueue> {
public:
enum class Running {
No,
Yes,
};
Running block_until_message_enqueued();
void stop();
void enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds);
struct BytesAndFds {
Vector<u8> bytes;
@ -40,8 +33,6 @@ private:
AllocatingMemoryStream m_stream;
Vector<int> m_fds;
Threading::Mutex m_mutex;
Threading::ConditionVariable m_condition { m_mutex };
bool m_running { true };
};
class TransportSocket {
@ -88,20 +79,41 @@ private:
static ErrorOr<void> send_message(Core::LocalSocket&, ReadonlyBytes& bytes, Vector<int>& unowned_fds);
void stop_send_thread();
enum class IOThreadState {
Running,
SendPendingMessagesAndStop,
Stopped,
};
intptr_t io_thread_loop();
void stop_io_thread(IOThreadState desired_state);
void wake_io_thread();
void read_incoming_messages();
NonnullOwnPtr<Core::LocalSocket> m_socket;
mutable Threading::RWLock m_socket_rw_lock;
ByteBuffer m_unprocessed_bytes;
Queue<File> m_unprocessed_fds;
// After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer.
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
// descriptor contained in the message before the peer receives it. https://openradar.me/9477351
Queue<NonnullRefPtr<AutoCloseFileDescriptor>> m_fds_retained_until_received_by_peer;
Threading::Mutex m_fds_retained_until_received_by_peer_mutex;
RefPtr<Threading::Thread> m_send_thread;
RefPtr<Threading::Thread> m_io_thread;
RefPtr<SendQueue> m_send_queue;
Atomic<IOThreadState> m_io_thread_state { IOThreadState::Running };
Atomic<bool> m_peer_eof { false };
ByteBuffer m_unprocessed_bytes;
Queue<File> m_unprocessed_fds;
Threading::Mutex m_incoming_mutex;
Threading::ConditionVariable m_incoming_cv { m_incoming_mutex };
Vector<NonnullOwnPtr<Message>> m_incoming_messages;
RefPtr<AutoCloseFileDescriptor> m_wakeup_io_thread_read_fd;
RefPtr<AutoCloseFileDescriptor> m_wakeup_io_thread_write_fd;
RefPtr<AutoCloseFileDescriptor> m_notify_hook_read_fd;
RefPtr<AutoCloseFileDescriptor> m_notify_hook_write_fd;
RefPtr<Core::Notifier> m_read_hook_notifier;
Function<void()> m_on_read_hook;
};
}