2022-01-23 23:31:51 +01:00
/*
* Copyright ( c ) 2022 , kleines Filmröllchen < filmroellchen @ serenityos . org >
2024-11-30 13:15:18 +05:00
* Copyright ( c ) 2024 , stasoid < stasoid @ yahoo . com >
2022-01-23 23:31:51 +01:00
*
* SPDX - License - Identifier : BSD - 2 - Clause
*/
# pragma once
2025-05-24 04:29:58 -06:00
# include <AK/AtomicRefCounted.h>
2022-01-23 23:31:51 +01:00
# include <AK/BuiltinWrappers.h>
2023-12-16 17:49:34 +03:30
# include <AK/ByteString.h>
2022-01-23 23:31:51 +01:00
# include <AK/Debug.h>
# include <AK/Function.h>
# include <LibCore/AnonymousBuffer.h>
namespace Core {
// A circular lock-free queue (or a buffer) with a single producer,
// residing in shared memory and designed to be accessible to multiple processes.
// This implementation makes use of the fact that any producer-related code can be sure that
// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code.
// The exclusivity and liveliness for critical sections in this class is proven to be correct
// under the assumption of correct synchronization primitives, i.e. atomics.
// In many circumstances, this is enough for cross-process queues.
// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory.
// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user.
// FIXME: Make this independent of shared memory, so that we can move it to AK.
template < typename T , size_t Size = 32 >
// Size must be a power of two, which speeds up the modulus operations for indexing.
requires ( popcount ( Size ) = = 1 )
class SharedSingleProducerCircularQueue final {
public :
using ValueType = T ;
enum class QueueStatus : u8 {
Invalid = 0 ,
Full ,
Empty ,
} ;
SharedSingleProducerCircularQueue ( ) = default ;
SharedSingleProducerCircularQueue ( SharedSingleProducerCircularQueue < ValueType , Size > & queue ) = default ;
SharedSingleProducerCircularQueue ( SharedSingleProducerCircularQueue & & queue ) = default ;
SharedSingleProducerCircularQueue & operator = ( SharedSingleProducerCircularQueue & & queue ) = default ;
// Allocates a new circular queue in shared memory.
2023-01-28 20:12:17 +00:00
static ErrorOr < SharedSingleProducerCircularQueue < T , Size > > create ( )
2022-01-23 23:31:51 +01:00
{
2024-11-30 13:15:18 +05:00
auto anon_buf = TRY ( AnonymousBuffer : : create_with_size ( sizeof ( SharedMemorySPCQ ) ) ) ;
auto shared_queue = new ( anon_buf . data < void > ( ) ) SharedMemorySPCQ ;
return create_internal ( anon_buf , shared_queue ) ;
2022-01-23 23:31:51 +01:00
}
// Uses an existing circular queue from given shared memory.
2023-01-28 20:12:17 +00:00
static ErrorOr < SharedSingleProducerCircularQueue < T , Size > > create ( int fd )
2022-01-23 23:31:51 +01:00
{
2024-11-30 13:15:18 +05:00
auto anon_buf = TRY ( AnonymousBuffer : : create_from_anon_fd ( fd , sizeof ( SharedMemorySPCQ ) ) ) ;
auto shared_queue = ( SharedMemorySPCQ * ) anon_buf . data < void > ( ) ;
return create_internal ( anon_buf , shared_queue ) ;
2022-01-23 23:31:51 +01:00
}
constexpr size_t size ( ) const { return Size ; }
// These functions are provably inconsistent and should only be used as hints to the actual capacity and used count.
ALWAYS_INLINE size_t weak_remaining_capacity ( ) const { return Size - weak_used ( ) ; }
ALWAYS_INLINE size_t weak_used ( ) const
{
auto volatile head = m_queue - > m_queue - > m_tail . load ( AK : : MemoryOrder : : memory_order_relaxed ) ;
auto volatile tail = m_queue - > m_queue - > m_head . load ( AK : : MemoryOrder : : memory_order_relaxed ) ;
return head - tail ;
}
2024-11-30 13:15:18 +05:00
ALWAYS_INLINE constexpr int fd ( ) const { return m_queue - > fd ( ) ; }
2022-01-23 23:31:51 +01:00
ALWAYS_INLINE constexpr bool is_valid ( ) const { return ! m_queue . is_null ( ) ; }
ALWAYS_INLINE constexpr size_t weak_head ( ) const { return m_queue - > m_queue - > m_head . load ( AK : : MemoryOrder : : memory_order_relaxed ) ; }
ALWAYS_INLINE constexpr size_t weak_tail ( ) const { return m_queue - > m_queue - > m_tail . load ( AK : : MemoryOrder : : memory_order_relaxed ) ; }
2023-01-28 20:12:17 +00:00
ErrorOr < void , QueueStatus > enqueue ( ValueType to_insert )
2022-01-23 23:31:51 +01:00
{
VERIFY ( ! m_queue . is_null ( ) ) ;
if ( ! can_enqueue ( ) )
return QueueStatus : : Full ;
auto our_tail = m_queue - > m_queue - > m_tail . load ( ) % Size ;
m_queue - > m_queue - > m_data [ our_tail ] = to_insert ;
2022-08-22 19:23:01 +02:00
m_queue - > m_queue - > m_tail . fetch_add ( 1 ) ;
2022-01-23 23:31:51 +01:00
return { } ;
}
ALWAYS_INLINE bool can_enqueue ( ) const
{
return ( ( head ( ) - 1 ) % Size ) ! = ( m_queue - > m_queue - > m_tail . load ( ) % Size ) ;
}
// Repeatedly try to enqueue, using the wait_function to wait if it's not possible
2023-01-28 20:12:17 +00:00
ErrorOr < void > blocking_enqueue ( ValueType to_insert , Function < void ( ) > wait_function )
2022-01-23 23:31:51 +01:00
{
ErrorOr < void , QueueStatus > result ;
while ( true ) {
2023-01-28 20:12:17 +00:00
result = enqueue ( to_insert ) ;
2022-04-27 12:41:18 +02:00
if ( ! result . is_error ( ) )
2022-01-23 23:31:51 +01:00
break ;
2022-04-27 12:41:18 +02:00
if ( result . error ( ) ! = QueueStatus : : Full )
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " Unexpected error while enqueuing " ) ;
2022-04-27 12:41:18 +02:00
wait_function ( ) ;
2022-01-23 23:31:51 +01:00
}
return { } ;
}
2023-01-28 20:12:17 +00:00
ErrorOr < ValueType , QueueStatus > dequeue ( )
2022-01-23 23:31:51 +01:00
{
VERIFY ( ! m_queue . is_null ( ) ) ;
while ( true ) {
// This CAS only succeeds if nobody is currently dequeuing.
auto size_max = NumericLimits < size_t > : : max ( ) ;
if ( m_queue - > m_queue - > m_head_protector . compare_exchange_strong ( size_max , m_queue - > m_queue - > m_head . load ( ) ) ) {
auto old_head = m_queue - > m_queue - > m_head . load ( ) ;
2022-08-22 19:23:01 +02:00
// This check looks like it's in a weird place (especially since we have to roll back the protector), but it's actually protecting against a race between multiple dequeuers.
if ( old_head > = m_queue - > m_queue - > m_tail . load ( ) ) {
m_queue - > m_queue - > m_head_protector . store ( NumericLimits < size_t > : : max ( ) , AK : : MemoryOrder : : memory_order_release ) ;
return QueueStatus : : Empty ;
}
2022-01-23 23:31:51 +01:00
auto data = move ( m_queue - > m_queue - > m_data [ old_head % Size ] ) ;
m_queue - > m_queue - > m_head . fetch_add ( 1 ) ;
m_queue - > m_queue - > m_head_protector . store ( NumericLimits < size_t > : : max ( ) , AK : : MemoryOrder : : memory_order_release ) ;
return { move ( data ) } ;
}
}
}
// The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing.
size_t head ( ) const
{
return min ( m_queue - > m_queue - > m_head . load ( ) , m_queue - > m_queue - > m_head_protector . load ( ) ) ;
}
private :
struct SharedMemorySPCQ {
SharedMemorySPCQ ( ) = default ;
SharedMemorySPCQ ( SharedMemorySPCQ const & ) = delete ;
SharedMemorySPCQ ( SharedMemorySPCQ & & ) = delete ;
~ SharedMemorySPCQ ( ) = default ;
// Invariant: tail >= head
// Invariant: head and tail are monotonically increasing
// Invariant: tail always points to the next free location where an enqueue can happen.
// Invariant: head always points to the element to be dequeued next.
// Invariant: tail is only modified by enqueue functions.
// Invariant: head is only modified by dequeue functions.
// An empty queue is signalled with: tail = head
// A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array)
// FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant.
// The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough.
2022-07-31 13:32:33 -07:00
AK_CACHE_ALIGNED Atomic < size_t , AK : : MemoryOrder : : memory_order_seq_cst > m_tail { 0 } ;
AK_CACHE_ALIGNED Atomic < size_t , AK : : MemoryOrder : : memory_order_seq_cst > m_head { 0 } ;
AK_CACHE_ALIGNED Atomic < size_t , AK : : MemoryOrder : : memory_order_seq_cst > m_head_protector { NumericLimits < size_t > : : max ( ) } ;
2022-01-23 23:31:51 +01:00
alignas ( ValueType ) Array < ValueType , Size > m_data ;
} ;
2024-11-30 13:15:18 +05:00
class RefCountedSharedMemorySPCQ
2025-05-24 04:29:58 -06:00
: public AtomicRefCounted < RefCountedSharedMemorySPCQ >
2024-11-30 13:15:18 +05:00
, public AnonymousBuffer {
2022-01-23 23:31:51 +01:00
friend class SharedSingleProducerCircularQueue ;
public :
SharedMemorySPCQ * m_queue ;
2025-05-24 04:29:58 -06:00
ByteString m_name ;
2022-01-23 23:31:51 +01:00
~ RefCountedSharedMemorySPCQ ( )
{
2025-05-24 04:29:58 -06:00
dbgln_if ( SHARED_QUEUE_DEBUG , " destructed SSPCQ at {:p} named {}, shared mem: {:p} " , this , m_name , m_queue ) ;
2022-01-23 23:31:51 +01:00
}
private :
2025-05-24 04:29:58 -06:00
RefCountedSharedMemorySPCQ ( AnonymousBuffer anon_buf , SharedMemorySPCQ * shared_queue , ByteString name )
2024-11-30 13:15:18 +05:00
: AnonymousBuffer ( anon_buf )
, m_queue ( shared_queue )
2025-05-24 04:29:58 -06:00
, m_name ( move ( name ) )
2022-01-23 23:31:51 +01:00
{
}
} ;
2024-11-30 13:15:18 +05:00
static ErrorOr < SharedSingleProducerCircularQueue < T , Size > > create_internal ( AnonymousBuffer anon_buf , SharedMemorySPCQ * shared_queue )
2022-01-23 23:31:51 +01:00
{
if ( ! shared_queue )
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " Unexpected error when creating shared queue from raw memory " ) ;
2024-11-30 13:15:18 +05:00
auto name = ByteString : : formatted ( " SharedSingleProducerCircularQueue@{:x} " , anon_buf . fd ( ) ) ;
dbgln_if ( SHARED_QUEUE_DEBUG , " successfully mmapped {} at {:p} " , name , shared_queue ) ;
2025-05-24 04:29:58 -06:00
auto ref_counted = new ( nothrow ) RefCountedSharedMemorySPCQ ( anon_buf , shared_queue , move ( name ) ) ;
return SharedSingleProducerCircularQueue < T , Size > { adopt_ref ( * ref_counted ) } ;
2022-01-23 23:31:51 +01:00
}
2025-05-24 04:29:58 -06:00
SharedSingleProducerCircularQueue ( RefPtr < RefCountedSharedMemorySPCQ > queue )
2022-01-23 23:31:51 +01:00
: m_queue ( queue )
{
}
RefPtr < RefCountedSharedMemorySPCQ > m_queue ;
} ;
}