Packet compression, cleaner sending to websocket

This commit is contained in:
IQuant 2024-05-04 19:04:09 +03:00
parent c1d8472b01
commit e0fb138f44
5 changed files with 83 additions and 41 deletions

View file

@ -1125,6 +1125,12 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]] [[package]]
name = "malloc_buf" name = "malloc_buf"
version = "0.0.6" version = "0.0.6"
@ -1220,6 +1226,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"bitcode", "bitcode",
"eframe", "eframe",
"lz4_flex",
"serde", "serde",
"tangled", "tangled",
"tracing", "tracing",

View file

@ -16,6 +16,7 @@ tracing = "0.1.40"
tangled = { path = "tangled" } tangled = { path = "tangled" }
serde = { version = "1.0.199", features = ["serde_derive"] } serde = { version = "1.0.199", features = ["serde_derive"] }
bitcode = "0.6.0" bitcode = "0.6.0"
lz4_flex = { version = "0.11.3", default_features = false, features = ["std"]}
[profile.dev] [profile.dev]
opt-level = 1 opt-level = 1

View file

@ -1,7 +1,7 @@
use std::{ use std::{
fmt::Display, fmt::Display,
io::{self, Write}, io::{self, Write},
net::{SocketAddr, TcpListener}, net::{SocketAddr, TcpListener, TcpStream},
sync::{atomic::AtomicBool, Arc, Mutex}, sync::{atomic::AtomicBool, Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
@ -11,7 +11,7 @@ use bitcode::{Decode, Encode};
use eframe::egui::{self, Color32}; use eframe::egui::{self, Color32};
use tangled::{Peer, PeerId, Reliability}; use tangled::{Peer, PeerId, Reliability};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use tungstenite::accept; use tungstenite::{accept, WebSocket};
use crate::messages::NetMsg; use crate::messages::NetMsg;
@ -39,6 +39,21 @@ fn ws_encode_mod(peer: PeerId, data: &[u8]) -> tungstenite::Message {
tungstenite::Message::Binary(buf) tungstenite::Message::Binary(buf)
} }
struct NetInnerState {
ws: Option<WebSocket<TcpStream>>,
}
impl NetInnerState {
fn try_ws_write(&mut self, data: tungstenite::Message) {
if let Some(ws) = &mut self.ws {
if let Err(err) = ws.write(data) {
error!("Error occured while sending to websocket: {}", err);
self.ws = None;
};
}
}
}
struct NetManager { struct NetManager {
peer: tangled::Peer, peer: tangled::Peer,
settings: Mutex<GameSettings>, settings: Mutex<GameSettings>,
@ -80,16 +95,15 @@ impl NetManager {
.set_nonblocking(true) .set_nonblocking(true)
.expect("can set nonblocking"); .expect("can set nonblocking");
let mut websocket = None; let mut state = NetInnerState { ws: None };
while self while self
.continue_running .continue_running
.load(std::sync::atomic::Ordering::Relaxed) .load(std::sync::atomic::Ordering::Relaxed)
{ {
self.local_connected self.local_connected
.store(websocket.is_some(), std::sync::atomic::Ordering::Relaxed); .store(state.ws.is_some(), std::sync::atomic::Ordering::Relaxed);
if websocket.is_none() if state.ws.is_none() && self.accept_local.load(std::sync::atomic::Ordering::SeqCst)
&& self.accept_local.load(std::sync::atomic::Ordering::SeqCst)
{ {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
if let Ok((stream, addr)) = local_server.accept() { if let Ok((stream, addr)) = local_server.accept() {
@ -99,28 +113,26 @@ impl NetManager {
.set_read_timeout(Some(Duration::from_millis(1))) .set_read_timeout(Some(Duration::from_millis(1)))
.expect("can set read timeout"); .expect("can set read timeout");
websocket = accept(stream).ok(); state.ws = accept(stream).ok();
if websocket.is_some() { if state.ws.is_some() {
info!("New stream connected"); info!("New stream connected");
if let Some(ws) = &mut websocket {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
ws.write(ws_encode_proxy("seed", settings.seed)).ok(); state.try_ws_write(ws_encode_proxy("seed", settings.seed));
ws.write(ws_encode_proxy( state.try_ws_write(ws_encode_proxy(
"peer_id", "peer_id",
self.peer.my_id().expect("Has peer id at this point"), self.peer.my_id().expect("Has peer id at this point"),
)) ));
.ok(); state.try_ws_write(ws_encode_proxy("name", "test_name"));
ws.write(ws_encode_proxy("name", "test_name")).ok(); state.try_ws_write(ws_encode_proxy("ready", ""));
ws.write(ws_encode_proxy("ready", "")).ok(); // TODO?
// TODO? for id in self.peer.iter_peer_ids() {
for id in self.peer.iter_peer_ids() { state.try_ws_write(ws_encode_proxy("join", id));
ws.write(ws_encode_proxy("join", id)).ok();
}
} }
} }
} }
} }
if let Some(ws) = &mut websocket { if let Some(ws) = &mut state.ws {
ws.flush().ok(); ws.flush().ok();
} }
for net_event in self.peer.recv() { for net_event in self.peer.recv() {
@ -137,14 +149,10 @@ impl NetManager {
tangled::Reliability::Reliable, tangled::Reliability::Reliable,
); );
} }
if let Some(ws) = &mut websocket { state.try_ws_write(ws_encode_proxy("join", id));
ws.write(ws_encode_proxy("join", id)).ok();
}
} }
tangled::NetworkEvent::PeerDisconnected(id) => { tangled::NetworkEvent::PeerDisconnected(id) => {
if let Some(ws) = &mut websocket { state.try_ws_write(ws_encode_proxy("leave", id));
ws.write(ws_encode_proxy("leave", id)).ok();
}
} }
tangled::NetworkEvent::Message(msg) => { tangled::NetworkEvent::Message(msg) => {
let Ok(net_msg) = bitcode::decode::<NetMsg>(&msg.data) else { let Ok(net_msg) = bitcode::decode::<NetMsg>(&msg.data) else {
@ -158,33 +166,49 @@ impl NetManager {
.store(true, std::sync::atomic::Ordering::SeqCst); .store(true, std::sync::atomic::Ordering::SeqCst);
} }
NetMsg::ModRaw { data } => { NetMsg::ModRaw { data } => {
if let Some(ws) = &mut websocket { state.try_ws_write(ws_encode_mod(msg.src, &data));
if let Err(err) = ws.write(ws_encode_mod(msg.src, &data)) { }
error!( NetMsg::ModCompressed { data } => {
"Error occured while sending to websocket: {}", if let Ok(decompressed) =
err lz4_flex::decompress_size_prepended(&data)
); {
websocket = None; state.try_ws_write(ws_encode_mod(msg.src, &decompressed));
};
} }
} }
} }
} }
} }
} }
if let Some(ws) = &mut websocket { if let Some(ws) = &mut state.ws {
let msg = ws.read(); let msg = ws.read();
match msg { match msg {
Ok(msg) => match msg { Ok(msg) => match msg {
tungstenite::Message::Binary(msg) => self tungstenite::Message::Binary(msg) => {
.broadcast(&NetMsg::ModRaw { data: msg }, Reliability::Unreliable), if msg.len() > 100 {
let compressed = lz4_flex::compress_prepend_size(&msg);
info!(
"Compressed {} bytes to {} bytes",
msg.len(),
compressed.len()
);
self.broadcast(
&NetMsg::ModCompressed { data: compressed },
Reliability::Unreliable,
);
} else {
self.broadcast(
&NetMsg::ModRaw { data: msg },
Reliability::Unreliable,
)
}
}
_ => {} _ => {}
}, },
Err(tungstenite::Error::Io(io_err)) Err(tungstenite::Error::Io(io_err))
if io_err.kind() == io::ErrorKind::WouldBlock => {} if io_err.kind() == io::ErrorKind::WouldBlock => {}
Err(err) => { Err(err) => {
error!("Error occured while reading from websocket: {}", err); error!("Error occured while reading from websocket: {}", err);
websocket = None; state.ws = None;
} }
} }
} }

View file

@ -6,4 +6,5 @@ use crate::GameSettings;
pub enum NetMsg { pub enum NetMsg {
StartGame { settings: GameSettings }, StartGame { settings: GameSettings },
ModRaw { data: Vec<u8> }, ModRaw { data: Vec<u8> },
ModCompressed { data: Vec<u8> },
} }

9
todo.txt Normal file
View file

@ -0,0 +1,9 @@
- Синхронизация перков
- Лимит на длину сообщения
- Улучшеная синхронизация инвентаря и текущего предмета
- reliability
+ Сжатие пакетов
- Общее хп
- Синхронизация противников
- Перекидывание предметов
- Синхронизация мира