make priority switching more reliable with less chance for stuttery chunks

This commit is contained in:
bgkillas 2024-09-02 08:13:01 -04:00
parent 4e941657c3
commit f18769920e

View file

@ -33,6 +33,17 @@ pub(crate) enum WorldNetMessage {
priority: u8,
can_wait: bool
},
// have peer make Authority request
AskForAuthority {
chunk: ChunkCoord,
priority: u8,
},
// switch peer to temp authority
LoseAuthority {
chunk: ChunkCoord,
new_priority: u8,
new_authority: OmniPeerId,
},
// Change priority
ChangePriority {
chunk: ChunkCoord,
@ -67,6 +78,7 @@ pub(crate) enum WorldNetMessage {
ListenUpdate {
delta: ChunkDelta,
priority: u8,
take_auth: bool
},
ListenAuthorityRelinquished {
chunk: ChunkCoord,
@ -104,8 +116,11 @@ enum ChunkState {
priority: u8},
/// Sending chunk updates to these listeners.
Authority { listeners: FxHashSet<OmniPeerId>,
priority: u8 },
/// Chunk is to be cleaned up.
priority: u8,
new_authority: Option<(OmniPeerId, u8)>,
stop_sending: bool
},
/// Chunk is to be cleaned up.
UnloadPending,
/// We've requested to take authority from someone else, and waiting for transfer to complete.
Transfer,
@ -116,7 +131,9 @@ impl ChunkState {
fn authority() -> ChunkState {
ChunkState::Authority {
listeners: Default::default(),
priority: 255
priority: 255,
new_authority: None,
stop_sending: false
}
}
}
@ -203,19 +220,36 @@ impl WorldManager {
ChunkState::Listening { authority, priority: pri } => {
if *pri > priority {
let cs = ChunkState::WantToGetAuth { authority: *authority, auth_priority: *pri, my_priority: priority };
emit_queue.push((
Destination::Peer(*authority),
WorldNetMessage::LoseAuthority {
chunk,
new_priority: priority,
new_authority: self.my_peer_id,
}
));
self.chunk_state.insert(chunk, cs);
}
}
ChunkState::WantToGetAuth { authority, auth_priority: auth_pri, my_priority: my_pri } => {
if *my_pri != priority {
*my_pri = priority
}
if *auth_pri <= priority {
let cs = ChunkState::Listening { authority: *authority, priority: *auth_pri };
self.chunk_state.insert(chunk, cs);
*my_pri = priority;
if *auth_pri <= priority {
let cs = ChunkState::Listening { authority: *authority, priority: *auth_pri };
self.chunk_state.insert(chunk, cs);
} else {
emit_queue.push((
Destination::Peer(*authority),
WorldNetMessage::LoseAuthority {
chunk,
new_priority: priority,
new_authority: self.my_peer_id,
}
));
}
}
}
ChunkState::Authority { listeners, priority: pri } => {
ChunkState::Authority { listeners, priority: pri, new_authority, stop_sending } if !*stop_sending => {
let Some(delta) = self.outbound_model.get_chunk_delta(chunk, false) else {
return;
};
@ -230,15 +264,32 @@ impl WorldManager {
},
));
}
let mut new_auth = None;
if let Some(new) = new_authority {
if new.1 >= priority {
*new_authority = None
} else {
new_auth = Some(new.0)
}
}
let mut new_auth_got = false;
for &listener in listeners.iter() {
let take_auth = new_auth == Some(listener);
if take_auth {
new_auth_got = true
}
emit_queue.push((
Destination::Peer(listener),
WorldNetMessage::ListenUpdate {
delta: delta.clone(),
priority
priority,
take_auth
},
));
}
if new_auth_got && new_auth.is_some() {
*stop_sending = true
}
}
_ => {}
}
@ -293,9 +344,19 @@ impl WorldManager {
*state = ChunkState::UnloadPending;
}
}
ChunkState::Authority { .. } => {
ChunkState::Authority { new_authority, .. } => {
if should_kill(self.my_pos, self.cam_pos, chunk.0, chunk.1)
{
if let Some(new) = new_authority
{
emit_queue.push((
Destination::Peer(new.0),
WorldNetMessage::AskForAuthority {
chunk,
priority: new.1
},
));
}
debug!("Unloading [authority] chunk {chunk:?} (updates: {chunk_last_update} {})", self.current_update);
emit_queue.push((
Destination::Host,
@ -429,6 +490,25 @@ impl WorldManager {
}
}
}
WorldNetMessage::AskForAuthority {chunk, priority} => {
self.emit_msg(
Destination::Host,
WorldNetMessage::RequestAuthority { chunk, priority, can_wait: false },
);
self.chunk_state.insert(chunk, ChunkState::WaitingForAuthority);
}
WorldNetMessage::LoseAuthority {chunk, new_authority, new_priority} => {
if let Some(ChunkState::Authority {new_authority: new_auth, ..}) = self.chunk_state.get_mut(&chunk) {
if let Some(new) = new_auth {
if new.1 > new_priority
{
*new_auth = Some((new_authority, new_priority));
}
} else {
*new_auth = Some((new_authority, new_priority))
}
}
}
WorldNetMessage::ChangePriority { chunk, priority } => {
if !self.is_host {
warn!("{} sent RequestAuthority to not-host.", source);
@ -485,7 +565,7 @@ impl WorldManager {
self.last_request_priority.remove(&chunk);
}
WorldNetMessage::ListenRequest { chunk } => {
let Some(ChunkState::Authority { listeners, priority }) = self.chunk_state.get_mut(&chunk)
let Some(ChunkState::Authority { listeners, priority, .. }) = self.chunk_state.get_mut(&chunk)
else {
//warn!("Can't listen for {chunk:?} - not an authority");
return;
@ -515,7 +595,7 @@ impl WorldManager {
warn!("Initial listen response has None chunk_data. It's generally supposed to have some.");
}
}
WorldNetMessage::ListenUpdate { delta, priority } => {
WorldNetMessage::ListenUpdate { delta, priority, take_auth } => {
match self.chunk_state.get_mut(&delta.chunk_coord)
{
Some(ChunkState::Listening { priority: pri, .. }) =>
@ -526,16 +606,18 @@ impl WorldManager {
{
if priority > *my_priority
{
let rq = WorldNetMessage::RequestAuthority {
chunk: delta.chunk_coord,
priority: *my_priority,
can_wait: false
};
self.emit_msg(
Destination::Host,
rq,
);
self.chunk_state.insert(delta.chunk_coord, ChunkState::WaitingForAuthority);
if take_auth {
let rq = WorldNetMessage::RequestAuthority {
chunk: delta.chunk_coord,
priority: *my_priority,
can_wait: false
};
self.emit_msg(
Destination::Host,
rq,
);
self.chunk_state.insert(delta.chunk_coord, ChunkState::WaitingForAuthority);
}
} else {
let cs = ChunkState::Listening {authority: *authority, priority};
self.chunk_state.insert(delta.chunk_coord, cs);
@ -562,7 +644,7 @@ impl WorldManager {
WorldNetMessage::RequestAuthorityTransfer { chunk } => {
info!("Got a request for authority transfer");
let state = self.chunk_state.get(&chunk);
if let Some(ChunkState::Authority { listeners, priority }) = state {
if let Some(ChunkState::Authority { listeners, priority, .. }) = state {
let chunk_data = self.outbound_model.get_chunk_data(chunk);
self.emit_msg(
Destination::Peer(source),
@ -598,7 +680,7 @@ impl WorldManager {
);
}
self.chunk_state
.insert(chunk, ChunkState::Authority { listeners, priority });
.insert(chunk, ChunkState::Authority { listeners, priority, new_authority: None, stop_sending: false });
self.last_request_priority.remove(&chunk);
}
WorldNetMessage::TransferFailed { chunk } => {