[MP] Implement "watched" properties.

Checked at "delta_interval" (default = every frame), synchronized
(reliably) if changes are detected.
This commit is contained in:
Fabio Alessandrelli 2023-03-28 09:30:58 +02:00
parent f581f21dd6
commit f1e0d50841
13 changed files with 422 additions and 50 deletions

View file

@ -138,15 +138,16 @@ void SceneReplicationInterface::on_network_process() {
spawn_queue.clear();
}
// Process timed syncs.
uint64_t msec = OS::get_singleton()->get_ticks_msec();
// Process syncs.
uint64_t usec = OS::get_singleton()->get_ticks_usec();
for (KeyValue<int, PeerInfo> &E : peers_info) {
const HashSet<ObjectID> to_sync = E.value.sync_nodes;
if (to_sync.is_empty()) {
continue; // Nothing to sync
}
uint16_t sync_net_time = ++E.value.last_sent_sync;
_send_sync(E.key, to_sync, sync_net_time, msec);
_send_sync(E.key, to_sync, sync_net_time, usec);
_send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
}
}
@ -280,6 +281,7 @@ Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_co
sync_nodes.erase(sid);
for (KeyValue<int, PeerInfo> &E : peers_info) {
E.value.sync_nodes.erase(sid);
E.value.last_watch_usecs.erase(sid);
if (sync->get_net_id()) {
E.value.recv_sync_ids.erase(sync->get_net_id());
}
@ -357,6 +359,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
E.value.sync_nodes.insert(sid);
} else {
E.value.sync_nodes.erase(sid);
E.value.last_watch_usecs.erase(sid);
}
}
return OK;
@ -369,6 +372,7 @@ Error SceneReplicationInterface::_update_sync_visibility(int p_peer, Multiplayer
peers_info[p_peer].sync_nodes.insert(sid);
} else {
peers_info[p_peer].sync_nodes.erase(sid);
peers_info[p_peer].last_watch_usecs.erase(sid);
}
return OK;
}
@ -670,8 +674,126 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p
return OK;
}
void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_msec) {
MAKE_ROOM(sync_mtu);
bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
r_net_id = p_sync->get_net_id();
if (r_net_id == 0 || (r_net_id & 0x80000000)) {
int path_id = 0;
bool verified = multiplayer->get_path_cache()->send_object_cache(p_sync, p_peer, path_id);
ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
if (r_net_id == 0) {
// First time path based ID.
r_net_id = path_id | 0x80000000;
p_sync->set_net_id(r_net_id | 0x80000000);
}
return verified;
}
return true;
}
MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
MultiplayerSynchronizer *sync = nullptr;
if (p_net_id & 0x80000000) {
sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
} else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
sync = get_id_as<MultiplayerSynchronizer>(sid);
}
return sync;
}
void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> p_last_watch_usecs) {
MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
uint8_t *ptr = packet_cache.ptrw();
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
int ofs = 1;
for (const ObjectID &oid : p_synchronizers) {
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
uint32_t net_id;
if (!_verify_synchronizer(p_peer, sync, net_id)) {
continue;
}
uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
uint64_t indexes;
List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
if (!delta.size()) {
continue; // Nothing to update.
}
Vector<const Variant *> varp;
varp.resize(delta.size());
const Variant **vptr = varp.ptrw();
int i = 0;
for (const Variant &v : delta) {
vptr[i] = &v;
}
int size;
Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
if (ofs + 4 + 8 + 4 + size > delta_mtu) {
// Send what we got, and reset write.
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
ofs = 1;
}
if (size) {
ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
ofs += encode_uint64(indexes, &ptr[ofs]);
ofs += encode_uint32(size, &ptr[ofs]);
MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
ofs += size;
}
#ifdef DEBUG_ENABLED
_profile_node_data("delta_out", oid, size);
#endif
peers_info[p_peer].last_watch_usecs[oid] = p_usec;
}
if (ofs > 1) {
// Got some left over to send.
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
}
}
Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
int ofs = 1;
while (ofs + 4 + 8 + 4 < p_buffer_len) {
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
ofs += 4;
uint64_t indexes = decode_uint64(&p_buffer[ofs]);
ofs += 8;
uint32_t size = decode_uint32(&p_buffer[ofs]);
ofs += 4;
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
Node *node = sync ? sync->get_root_node() : nullptr;
if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
ofs += size;
ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
}
List<NodePath> props = sync->get_delta_properties(indexes);
ERR_FAIL_COND_V(props.size() == 0, ERR_INVALID_DATA);
Vector<Variant> vars;
vars.resize(props.size());
int consumed = 0;
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
ERR_FAIL_COND_V(err != OK, err);
ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
err = MultiplayerSynchronizer::set_state(props, node, vars);
ERR_FAIL_COND_V(err != OK, err);
ofs += size;
sync->emit_signal(SNAME("delta_synchronized"));
#ifdef DEBUG_ENABLED
_profile_node_data("delta_in", sync->get_instance_id(), size);
#endif
}
return OK;
}
void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
uint8_t *ptr = packet_cache.ptrw();
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
int ofs = 1;
@ -681,26 +803,16 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
for (const ObjectID &oid : p_synchronizers) {
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
if (!sync->update_outbound_sync_time(p_msec)) {
if (!sync->update_outbound_sync_time(p_usec)) {
continue; // nothing to sync.
}
Node *node = sync->get_root_node();
ERR_CONTINUE(!node);
uint32_t net_id = sync->get_net_id();
if (net_id == 0 || (net_id & 0x80000000)) {
int path_id = 0;
bool verified = multiplayer->get_path_cache()->send_object_cache(sync, p_peer, path_id);
ERR_CONTINUE_MSG(path_id < 0, "This should never happen!");
if (net_id == 0) {
// First time path based ID.
net_id = path_id | 0x80000000;
sync->set_net_id(net_id | 0x80000000);
}
if (!verified) {
// The path based sync is not yet confirmed, skipping.
continue;
}
if (!_verify_synchronizer(p_peer, sync, net_id)) {
// The path based sync is not yet confirmed, skipping.
continue;
}
int size;
Vector<Variant> vars;
@ -711,7 +823,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
// TODO Handle single state above MTU.
ERR_CONTINUE_MSG(size > 3 + 4 + 4 + sync_mtu, vformat("Node states bigger then MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
if (ofs + 4 + 4 + size > sync_mtu) {
// Send what we got, and reset write.
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
@ -735,6 +847,10 @@ void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p
Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
if (is_delta) {
return on_delta_receive(p_from, p_buffer, p_buffer_len);
}
uint16_t time = decode_uint16(&p_buffer[1]);
int ofs = 3;
while (ofs + 8 < p_buffer_len) {
@ -743,13 +859,7 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
uint32_t size = decode_uint32(&p_buffer[ofs]);
ofs += 4;
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
MultiplayerSynchronizer *sync = nullptr;
if (net_id & 0x80000000) {
sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF));
} else if (peers_info[p_from].recv_sync_ids.has(net_id)) {
const ObjectID &sid = peers_info[p_from].recv_sync_ids[net_id];
sync = get_id_as<MultiplayerSynchronizer>(sid);
}
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
if (!sync) {
// Not received yet.
ofs += size;
@ -782,3 +892,21 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu
}
return OK;
}
void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
sync_mtu = p_size;
}
int SceneReplicationInterface::get_max_sync_packet_size() const {
return sync_mtu;
}
void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
delta_mtu = p_size;
}
int SceneReplicationInterface::get_max_delta_packet_size() const {
return delta_mtu;
}