/* * Copyright (c) 2024, Andrew Kaster * Copyright (c) 2025, Aliaksandr Kalenik * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include #include #include #include #include namespace IPC { ErrorOr> TransportSocket::from_socket(NonnullOwnPtr socket) { return make(move(socket)); } ErrorOr TransportSocket::create_paired() { int fds[2] {}; TRY(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds)); ArmedScopeGuard guard_fd_0 { [&] { MUST(Core::System::close(fds[0])); } }; ArmedScopeGuard guard_fd_1 { [&] { MUST(Core::System::close(fds[1])); } }; auto socket0 = TRY(Core::LocalSocket::adopt_fd(fds[0])); guard_fd_0.disarm(); TRY(socket0->set_close_on_exec(true)); TRY(socket0->set_blocking(false)); TRY(Core::System::set_close_on_exec(fds[1], true)); guard_fd_1.disarm(); // Local side gets a full transport; remote side is just a handle containing the raw fd for transfer to another process. return Paired { make(move(socket0)), TransportHandle { File::adopt_fd(fds[1]) }, }; } void SendQueue::enqueue_message(ReadonlyBytes header, ReadonlyBytes payload, Vector&& fds) { Threading::MutexLocker locker(m_mutex); VERIFY(MUST(m_stream.write_some(header)) == header.size()); VERIFY(MUST(m_stream.write_some(payload)) == payload.size()); m_fds.append(fds.data(), fds.size()); } SendQueue::BytesAndFds SendQueue::peek(size_t max_bytes) { Threading::MutexLocker locker(m_mutex); BytesAndFds result; auto bytes_to_send = min(max_bytes, m_stream.used_buffer_size()); result.bytes.resize(bytes_to_send); m_stream.peek_some(result.bytes); if (m_fds.size() > 0) { auto fds_to_send = min(m_fds.size(), Core::LocalSocket::MAX_TRANSFER_FDS); result.fds = Vector { m_fds.span().slice(0, fds_to_send) }; // NOTE: This relies on a subsequent call to discard to actually remove the fds from m_fds } return result; } void SendQueue::discard(size_t bytes_count, size_t fds_count) { Threading::MutexLocker locker(m_mutex); MUST(m_stream.discard(bytes_count)); m_fds.remove(0, fds_count); } TransportSocket::TransportSocket(NonnullOwnPtr socket) : m_socket(move(socket)) { // Disable the socket's built-in notifier. TransportSocket uses its own pipe-based notification mechanism on the IO // thread, so this notifier is unused. Otherwise, when the socket reaches EOF, this notifier is disabled from the IO // thread. In the Qt UI, this causes QSocketNotifier destruction to be deferred. If the socket is closed before the // deferred destruction runs, Qt detects an invalid socket and prints a warning. m_socket->set_notifications_enabled(false); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE)); m_send_queue = adopt_ref(*new SendQueue); auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK)); m_wakeup_io_thread_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0])); m_wakeup_io_thread_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1])); { auto fds = MUST(Core::System::pipe2(O_CLOEXEC | O_NONBLOCK)); m_notify_hook_read_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[0])); m_notify_hook_write_fd = adopt_ref(*new AutoCloseFileDescriptor(fds[1])); } m_io_thread = Threading::Thread::construct("IPC IO"sv, [this] { return io_thread_loop(); }); m_io_thread->start(); } intptr_t TransportSocket::io_thread_loop() { Array pollfds; for (;;) { auto want_to_write = [&] { auto [bytes, fds] = m_send_queue->peek(1); return !bytes.is_empty() || !fds.is_empty(); }(); auto state = m_io_thread_state.load(); if (state == IOThreadState::Stopped) break; if (state == IOThreadState::SendPendingMessagesAndStop && !want_to_write) { m_io_thread_state = IOThreadState::Stopped; break; } short events = POLLIN; if (want_to_write) events |= POLLOUT; pollfds[0] = { .fd = m_socket->fd().value(), .events = events, .revents = 0 }; pollfds[1] = { .fd = m_wakeup_io_thread_read_fd->value(), .events = POLLIN, .revents = 0 }; ErrorOr result { 0 }; do { result = Core::System::poll(pollfds, -1); } while (result.is_error() && result.error().code() == EINTR); if (result.is_error()) { dbgln("TransportSocket poll error: {}", result.error()); m_io_thread_state = IOThreadState::Stopped; break; } if (pollfds[1].revents & POLLIN) { char buf[64]; // The wakeup pipe is non-blocking, so EAGAIN is possible if there's a spurious wakeup. (void)Core::System::read(m_wakeup_io_thread_read_fd->value(), { buf, sizeof(buf) }); } if (pollfds[0].revents & POLLIN) read_incoming_messages(); if (pollfds[0].revents & POLLHUP) { m_io_thread_state = IOThreadState::Stopped; break; } if (pollfds[0].revents & (POLLERR | POLLNVAL)) { dbgln("TransportSocket poll: socket error (POLLERR or POLLNVAL)"); m_io_thread_state = IOThreadState::Stopped; break; } if (pollfds[0].revents & POLLOUT) { auto [bytes, fds] = m_send_queue->peek(4096); if (!bytes.is_empty() || !fds.is_empty()) { ReadonlyBytes remaining = bytes; if (transfer_data(remaining, fds) == TransferState::SocketClosed) { m_io_thread_state = IOThreadState::Stopped; } } } } VERIFY(m_io_thread_state == IOThreadState::Stopped); if (!m_is_being_transferred.load(AK::MemoryOrder::memory_order_acquire)) { m_peer_eof = true; m_incoming_cv.broadcast(); notify_read_available(); } return 0; } void TransportSocket::wake_io_thread() { Array bytes = { 0 }; (void)Core::System::write(m_wakeup_io_thread_write_fd->value(), bytes); } TransportSocket::~TransportSocket() { stop_io_thread(IOThreadState::Stopped); m_read_hook_notifier.clear(); } void TransportSocket::stop_io_thread(IOThreadState desired_state) { VERIFY(desired_state == IOThreadState::Stopped || desired_state == IOThreadState::SendPendingMessagesAndStop); m_io_thread_state.store(desired_state, AK::MemoryOrder::memory_order_release); wake_io_thread(); if (m_io_thread && m_io_thread->needs_to_be_joined()) (void)m_io_thread->join(); } void TransportSocket::notify_read_available() { if (!m_notify_hook_write_fd) return; Array bytes = { 0 }; (void)Core::System::write(m_notify_hook_write_fd->value(), bytes); } void TransportSocket::set_up_read_hook(Function hook) { m_on_read_hook = move(hook); m_read_hook_notifier = Core::Notifier::construct(m_notify_hook_read_fd->value(), Core::NotificationType::Read); m_read_hook_notifier->on_activation = [this] { VERIFY(m_notify_hook_read_fd); char buf[64]; (void)Core::System::read(m_notify_hook_read_fd->value(), { buf, sizeof(buf) }); if (m_on_read_hook) m_on_read_hook(); }; { Threading::MutexLocker locker(m_incoming_mutex); if (!m_incoming_messages.is_empty()) { Array bytes = { 0 }; MUST(Core::System::write(m_notify_hook_write_fd->value(), bytes)); } } } bool TransportSocket::is_open() const { return m_socket->is_open(); } void TransportSocket::close() { stop_io_thread(IOThreadState::Stopped); m_socket->close(); } void TransportSocket::close_after_sending_all_pending_messages() { stop_io_thread(IOThreadState::SendPendingMessagesAndStop); m_socket->close(); } void TransportSocket::wait_until_readable() { Threading::MutexLocker lock(m_incoming_mutex); while (m_incoming_messages.is_empty() && m_io_thread_state == IOThreadState::Running) { m_incoming_cv.wait(); } } // Maximum size of accumulated unprocessed bytes before we disconnect the peer static constexpr size_t MAX_UNPROCESSED_BUFFER_SIZE = 128 * MiB; // Maximum number of accumulated unprocessed file descriptors before we disconnect the peer static constexpr size_t MAX_UNPROCESSED_FDS = 512; struct MessageHeader { enum class Type : u8 { Payload = 0, FileDescriptorAcknowledgement = 1, }; Type type { Type::Payload }; u32 payload_size { 0 }; u32 fd_count { 0 }; }; void TransportSocket::post_message(Vector const& bytes_to_write, Vector& attachments) { auto num_fds_to_transfer = attachments.size(); MessageHeader header { .type = MessageHeader::Type::Payload, .payload_size = static_cast(bytes_to_write.size()), .fd_count = static_cast(num_fds_to_transfer), }; auto raw_fds = Vector {}; if (num_fds_to_transfer > 0) { raw_fds.ensure_capacity(num_fds_to_transfer); Threading::MutexLocker locker(m_fds_retained_until_received_by_peer_mutex); for (auto& attachment : attachments) { int fd = attachment.to_fd(); auto auto_fd = adopt_ref(*new AutoCloseFileDescriptor(fd)); raw_fds.unchecked_append(auto_fd->value()); m_fds_retained_until_received_by_peer.enqueue(move(auto_fd)); } } m_send_queue->enqueue_message({ reinterpret_cast(&header), sizeof(header) }, bytes_to_write, move(raw_fds)); wake_io_thread(); } ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector& unowned_fds) { auto num_fds_to_transfer = unowned_fds.size(); while (!bytes_to_write.is_empty()) { ErrorOr maybe_nwritten = 0; if (num_fds_to_transfer > 0) { maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds); } else { maybe_nwritten = socket.write_some(bytes_to_write); } if (maybe_nwritten.is_error()) { if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK || error.code() == EINTR)) { return {}; } else { return error; } } bytes_to_write = bytes_to_write.slice(maybe_nwritten.value()); num_fds_to_transfer = 0; unowned_fds.clear(); } return {}; } TransportSocket::TransferState TransportSocket::transfer_data(ReadonlyBytes& bytes, Vector& fds) { auto byte_count = bytes.size(); auto fd_count = fds.size(); if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) { if (result.error().is_errno() && result.error().code() == EPIPE) { // The socket is closed from the other end, we can stop sending. return TransferState::SocketClosed; } dbgln("TransportSocket::send_thread: {}", result.error()); return TransferState::SocketClosed; } auto written_byte_count = byte_count - bytes.size(); auto written_fd_count = fd_count - fds.size(); if (written_byte_count > 0 || written_fd_count > 0) m_send_queue->discard(written_byte_count, written_fd_count); return TransferState::Continue; } void TransportSocket::read_incoming_messages() { Vector> batch; while (m_socket->is_open()) { u8 buffer[4096]; auto received_fds = Vector {}; auto maybe_bytes_read = m_socket->receive_message({ buffer, 4096 }, MSG_DONTWAIT, received_fds); if (maybe_bytes_read.is_error()) { auto error = maybe_bytes_read.release_error(); if (error.is_errno() && error.code() == EAGAIN) { break; } if (error.is_errno() && error.code() == ECONNRESET) { m_peer_eof = true; break; } dbgln("TransportSocket::read_as_much_as_possible_without_blocking: {}", error); warnln("TransportSocket::read_as_much_as_possible_without_blocking: {}", error); m_peer_eof = true; break; } auto bytes_read = maybe_bytes_read.release_value(); if (bytes_read.is_empty() && received_fds.is_empty()) { m_peer_eof = true; break; } if (m_unprocessed_bytes.size() + bytes_read.size() > MAX_UNPROCESSED_BUFFER_SIZE) { dbgln("TransportSocket: Unprocessed buffer would exceed {} bytes, disconnecting peer", MAX_UNPROCESSED_BUFFER_SIZE); m_peer_eof = true; break; } if (m_unprocessed_bytes.try_append(bytes_read.data(), bytes_read.size()).is_error()) { dbgln("TransportSocket: Failed to append to unprocessed_bytes buffer"); m_peer_eof = true; break; } if (m_unprocessed_attachments.size() + received_fds.size() > MAX_UNPROCESSED_FDS) { dbgln("TransportSocket: Unprocessed FDs would exceed {}, disconnecting peer", MAX_UNPROCESSED_FDS); m_peer_eof = true; break; } for (auto const& fd : received_fds) { m_unprocessed_attachments.enqueue(Attachment::from_fd(fd)); } } Checked received_fd_count = 0; Checked acknowledged_fd_count = 0; size_t index = 0; while (index + sizeof(MessageHeader) <= m_unprocessed_bytes.size()) { MessageHeader header; memcpy(&header, m_unprocessed_bytes.data() + index, sizeof(MessageHeader)); if (header.type == MessageHeader::Type::Payload) { if (header.payload_size > MAX_MESSAGE_PAYLOAD_SIZE) { dbgln("TransportSocket: Rejecting message with payload_size {} exceeding limit {}", header.payload_size, MAX_MESSAGE_PAYLOAD_SIZE); m_peer_eof = true; break; } if (header.fd_count > MAX_MESSAGE_FD_COUNT) { dbgln("TransportSocket: Rejecting message with fd_count {} exceeding limit {}", header.fd_count, MAX_MESSAGE_FD_COUNT); m_peer_eof = true; break; } Checked message_size = header.payload_size; message_size += sizeof(MessageHeader); if (message_size.has_overflow() || message_size.value() > m_unprocessed_bytes.size() - index) break; if (header.fd_count > m_unprocessed_attachments.size()) break; auto message = make(); received_fd_count += header.fd_count; if (received_fd_count.has_overflow()) { dbgln("TransportSocket: received_fd_count would overflow"); m_peer_eof = true; break; } for (size_t i = 0; i < header.fd_count; ++i) message->attachments.enqueue(m_unprocessed_attachments.dequeue()); if (message->bytes.try_append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size).is_error()) { dbgln("TransportSocket: Failed to allocate message buffer for payload_size {}", header.payload_size); m_peer_eof = true; break; } batch.append(move(message)); } else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) { if (header.payload_size != 0) { dbgln("TransportSocket: FileDescriptorAcknowledgement with non-zero payload_size {}", header.payload_size); m_peer_eof = true; break; } acknowledged_fd_count += header.fd_count; if (acknowledged_fd_count.has_overflow()) { dbgln("TransportSocket: acknowledged_fd_count would overflow"); m_peer_eof = true; break; } } else { dbgln("TransportSocket: Unknown message header type {}", static_cast(header.type)); m_peer_eof = true; break; } Checked new_index = index; new_index += header.payload_size; new_index += sizeof(MessageHeader); if (new_index.has_overflow()) { dbgln("TransportSocket: index would overflow"); m_peer_eof = true; break; } index = new_index.value(); } if (acknowledged_fd_count > 0u) { Threading::MutexLocker locker(m_fds_retained_until_received_by_peer_mutex); while (acknowledged_fd_count > 0u) { if (m_fds_retained_until_received_by_peer.is_empty()) { dbgln("TransportSocket: Peer acknowledged more FDs than we sent"); m_peer_eof = true; break; } (void)m_fds_retained_until_received_by_peer.dequeue(); --acknowledged_fd_count; } } if (received_fd_count > 0u) { MessageHeader header { .type = MessageHeader::Type::FileDescriptorAcknowledgement, .payload_size = 0, .fd_count = received_fd_count.value(), }; m_send_queue->enqueue_message({ reinterpret_cast(&header), sizeof(header) }, {}, {}); wake_io_thread(); } if (index < m_unprocessed_bytes.size()) { auto remaining = m_unprocessed_bytes.size() - index; m_unprocessed_bytes.overwrite(0, m_unprocessed_bytes.data() + index, remaining); m_unprocessed_bytes.resize(remaining); } else { m_unprocessed_bytes.clear(); } if (!batch.is_empty()) { Threading::MutexLocker locker(m_incoming_mutex); m_incoming_messages.extend(move(batch)); m_incoming_cv.broadcast(); notify_read_available(); } if (m_peer_eof) { m_incoming_cv.broadcast(); notify_read_available(); } } TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { Vector> messages; { Threading::MutexLocker locker(m_incoming_mutex); messages = move(m_incoming_messages); } for (auto& message : messages) callback(move(*message)); return m_peer_eof ? ShouldShutdown::Yes : ShouldShutdown::No; } ErrorOr TransportSocket::release_for_transfer() { m_is_being_transferred.store(true, AK::MemoryOrder::memory_order_release); stop_io_thread(IOThreadState::SendPendingMessagesAndStop); auto fd = TRY(m_socket->release_fd()); return TransportHandle { File::adopt_fd(fd) }; } }