From 65be940162369e0e20d37a53b97f20f95185ec0d Mon Sep 17 00:00:00 2001 From: IQuant Date: Fri, 30 Aug 2024 20:18:54 +0300 Subject: [PATCH] Proority stuff in world sync --- docs/distributed_world_sync.drawio | 104 +++++++++++++++++-------- noita-proxy/src/net/world.rs | 121 +++++++++++++++++++++++++++-- 2 files changed, 186 insertions(+), 39 deletions(-) diff --git a/docs/distributed_world_sync.drawio b/docs/distributed_world_sync.drawio index fb71d55a..561866f1 100644 --- a/docs/distributed_world_sync.drawio +++ b/docs/distributed_world_sync.drawio @@ -1,113 +1,125 @@ - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - + - + - + - + + + + + + + + + + - + - + - + - + - + - + - + - + @@ -116,16 +128,16 @@ - + - + - + - + @@ -133,14 +145,38 @@ - + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/noita-proxy/src/net/world.rs b/noita-proxy/src/net/world.rs index 45e5b853..11e2f090 100644 --- a/noita-proxy/src/net/world.rs +++ b/noita-proxy/src/net/world.rs @@ -29,7 +29,7 @@ pub(crate) enum WorldNetMessage { // Authority request RequestAuthority { chunk: ChunkCoord, - priority: u8 + priority: u8, }, // When got authority GotAuthority { @@ -62,6 +62,25 @@ pub(crate) enum WorldNetMessage { ListenAuthorityRelinquished { chunk: ChunkCoord, }, + // Authority transfer stuff (due to priority) + GetAuthorityFrom { + chunk: ChunkCoord, + current_authority: OmniPeerId, + }, + RequestAuthorityTransfer { + chunk: ChunkCoord, + }, + TransferOk { + chunk: ChunkCoord, + chunk_data: Option, + listeners: FxHashSet, + }, + TransferFailed { + chunk: ChunkCoord, + }, + NotifyNewAuthority { + chunk: ChunkCoord, + }, } #[derive(Debug, Decode, Encode)] @@ -79,6 +98,8 @@ enum ChunkState { Authority { listeners: FxHashSet }, /// Chunk is to be cleaned up. UnloadPending, + /// We've requested to take authority from someone else, and waiting for transfer to complete. + Transfer, } impl ChunkState { fn authority() -> ChunkState { @@ -109,6 +130,8 @@ pub(crate) struct WorldManager { /// Update number in which chunk has been updated locally. /// Used to track which chunks can be unloaded. chunk_last_update: FxHashMap, + /// Stores last priority we used for that chunk, in case transfer fails and we'll need to request authority normally. + last_request_priority: FxHashMap, } impl WorldManager { @@ -126,6 +149,7 @@ impl WorldManager { emitted_messages: Default::default(), current_update: 0, chunk_last_update: Default::default(), + last_request_priority: Default::default(), } } @@ -150,7 +174,7 @@ impl WorldManager { fn chunk_updated_locally(&mut self, chunk: ChunkCoord, priority: u8) { let entry = self.chunk_state.entry(chunk).or_insert_with(|| { debug!("Created entry for {chunk:?}"); - ChunkState::RequestAuthority{priority} + ChunkState::RequestAuthority { priority } }); let mut emit_queue = Vec::new(); self.chunk_last_update.insert(chunk, self.current_update); @@ -185,13 +209,14 @@ impl WorldManager { .copied() .unwrap_or_default(); match state { - ChunkState::RequestAuthority{priority} => { + ChunkState::RequestAuthority { priority } => { let priority = *priority; emit_queue.push(( Destination::Host, WorldNetMessage::RequestAuthority { chunk, priority }, )); *state = ChunkState::WaitingForAuthority; + self.last_request_priority.insert(chunk, priority); debug!("Requested authority for {chunk:?}") } // This state doesn't have much to do. @@ -220,6 +245,7 @@ impl WorldManager { } } ChunkState::UnloadPending => {} + ChunkState::Transfer => {} } } @@ -286,6 +312,23 @@ impl WorldManager { ); } + fn emit_transfer_authority( + &mut self, + chunk: ChunkCoord, + source: OmniPeerId, + priority: u8, + current_authority: OmniPeerId, + ) { + self.authority_map.insert(chunk, (source, priority)); + self.emit_msg( + Destination::Peer(source), + WorldNetMessage::GetAuthorityFrom { + chunk, + current_authority, + }, + ); + } + pub(crate) fn handle_msg(&mut self, source: OmniPeerId, msg: WorldNetMessage) { match msg { WorldNetMessage::RequestAuthority { chunk, priority } => { @@ -301,7 +344,7 @@ impl WorldManager { self.emit_got_authority(chunk, source, priority); } else if priority_state > priority { info!("{source} is gaining priority over {chunk:?} from {authority}"); - self.emit_got_authority(chunk, source, priority); + self.emit_transfer_authority(chunk, source, priority, authority); } else { debug!("{source} requested authority for {chunk:?}, but it's already taken by {authority}"); self.emit_msg( @@ -321,6 +364,7 @@ impl WorldManager { self.inbound_model.apply_chunk_data(chunk, chunk_data); } self.chunk_state.insert(chunk, ChunkState::authority()); + self.last_request_priority.remove(&chunk); } WorldNetMessage::RelinquishAuthority { chunk, chunk_data } => { if !self.is_host { @@ -349,6 +393,7 @@ impl WorldManager { Destination::Peer(authority), WorldNetMessage::ListenRequest { chunk }, ); + self.last_request_priority.remove(&chunk); } WorldNetMessage::ListenRequest { chunk } => { let Some(ChunkState::Authority { listeners }) = self.chunk_state.get_mut(&chunk) @@ -395,6 +440,72 @@ impl WorldManager { WorldNetMessage::ListenAuthorityRelinquished { chunk } => { self.chunk_state.insert(chunk, ChunkState::UnloadPending); } + WorldNetMessage::GetAuthorityFrom { + chunk, + current_authority, + } => { + self.chunk_state.insert(chunk, ChunkState::Transfer); + self.emit_msg( + Destination::Peer(current_authority), + WorldNetMessage::RequestAuthorityTransfer { chunk }, + ); + } + WorldNetMessage::RequestAuthorityTransfer { chunk } => { + let state = self.chunk_state.get(&chunk); + if let Some(ChunkState::Authority { listeners }) = state { + let chunk_data = self.outbound_model.get_chunk_data(chunk); + self.emit_msg( + Destination::Peer(source), + WorldNetMessage::TransferOk { + chunk, + chunk_data, + listeners: listeners.clone(), + }, + ); + self.chunk_state.insert(chunk, ChunkState::UnloadPending); + } else { + self.emit_msg( + Destination::Peer(source), + WorldNetMessage::TransferFailed { chunk }, + ); + } + } + WorldNetMessage::TransferOk { + chunk, + chunk_data, + listeners, + } => { + if let Some(chunk_data) = chunk_data { + self.inbound_model.apply_chunk_data(chunk, chunk_data); + } + for listener in listeners.iter() { + self.emit_msg( + Destination::Peer(*listener), + WorldNetMessage::NotifyNewAuthority { chunk }, + ); + } + self.chunk_state + .insert(chunk, ChunkState::Authority { listeners }); + self.last_request_priority.remove(&chunk); + } + WorldNetMessage::TransferFailed { chunk } => { + warn!("Transfer failed, requesting authority normally"); + let priority = self + .last_request_priority + .get(&chunk) + .copied() + .unwrap_or(255); + self.chunk_state + .insert(chunk, ChunkState::RequestAuthority { priority }); + } + WorldNetMessage::NotifyNewAuthority { chunk } => { + let state = self.chunk_state.get_mut(&chunk); + if let Some(ChunkState::Listening { authority }) = state { + *authority = source; + } else { + warn!("Got notified of new authority, but not a listener"); + } + } } } @@ -431,4 +542,4 @@ impl Drop for WorldManager { impl SaveStateEntry for FxHashMap { const FILENAME: &'static str = "world_chunks"; -} \ No newline at end of file +}